This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push: new c076383e Improve topology integration / replay c076383e is described below commit c076383eb432670c4d919e9fd0db76296169ea00 Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Fri Dec 6 15:58:16 2024 +0100 Improve topology integration / replay - integrate topology reloading into burn test - improve epoch handling Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20142 --- .../main/java/accord/api/ConfigurationService.java | 5 +- accord-core/src/main/java/accord/api/Journal.java | 58 ++++++++++-- .../accord/impl/AbstractConfigurationService.java | 35 ++++++- .../src/main/java/accord/impl/CommandChange.java | 4 +- .../java/accord/impl/InMemoryCommandStore.java | 4 +- .../java/accord/impl/InMemoryCommandStores.java | 17 ++-- .../src/main/java/accord/local/Bootstrap.java | 21 +++-- .../src/main/java/accord/local/CommandStore.java | 2 + .../src/main/java/accord/local/CommandStores.java | 104 +++++++++++++++++++-- accord-core/src/main/java/accord/local/Node.java | 9 +- .../main/java/accord/topology/TopologyManager.java | 7 +- accord-core/src/test/java/accord/Utils.java | 4 +- .../src/test/java/accord/impl/basic/Cluster.java | 69 +++++++------- .../accord/impl/basic/DelayedCommandStores.java | 93 ++++++++++++------ .../java/accord/impl/basic/InMemoryJournal.java | 41 ++++++++ .../java/accord/impl/basic/LoggingJournal.java | 17 ++++ .../src/test/java/accord/impl/list/ListStore.java | 7 +- .../test/java/accord/impl/mock/MockCluster.java | 10 +- .../java/accord/local/ImmutableCommandTest.java | 10 +- .../src/main/java/accord/maelstrom/Cluster.java | 32 ++++++- .../src/main/java/accord/maelstrom/Main.java | 4 +- 21 files changed, 429 insertions(+), 124 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index 57678614..3650f274 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -18,7 +18,6 @@ package accord.api; -import java.util.Collection; import javax.annotation.Nullable; import accord.local.Node; @@ -129,6 +128,8 @@ public interface ConfigurationService * the initial topology returned by `currentTopology` on startup. * * TODO (desired): document what this Future represents, or maybe refactor it away - only used for testing + * + * * {@param isLoad} - whether current topology update is being loaded from the local node during startup */ AsyncResult<Void> onTopologyUpdate(Topology topology, boolean isLoad, boolean startSync); @@ -160,7 +161,7 @@ public interface ConfigurationService */ void onEpochRedundant(Ranges ranges, long epoch); - default void onRemoveNodes(long epoch, Collection<Node.Id> removed) {} + default void onRemoveNode(long epoch, Node.Id removed) {} } void registerListener(Listener listener); diff --git a/accord-core/src/main/java/accord/api/Journal.java b/accord-core/src/main/java/accord/api/Journal.java index 2fd70165..a201653d 100644 --- a/accord-core/src/main/java/accord/api/Journal.java +++ b/accord-core/src/main/java/accord/api/Journal.java @@ -18,7 +18,9 @@ package accord.api; +import java.util.Iterator; import java.util.NavigableMap; +import java.util.Objects; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -29,31 +31,75 @@ import accord.local.RedundantBefore; import accord.primitives.Ranges; import accord.primitives.Timestamp; import accord.primitives.TxnId; +import accord.topology.Topology; import accord.utils.PersistentField.Persister; +import org.agrona.collections.Int2ObjectHashMap; /** * Persisted journal for transactional recovery. */ public interface Journal { - Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); - Command.Minimal loadMinimal(int commandStoreId, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore); + Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore); + Command.Minimal loadMinimal(int store, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore); // TODO (required): use OnDone instead of Runnable void saveCommand(int store, CommandUpdate value, Runnable onFlush); + Iterator<TopologyUpdate> replayTopologies(); // reverse iterator + void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush); + void purge(CommandStores commandStores); void replay(CommandStores commandStores); - RedundantBefore loadRedundantBefore(int commandStoreId); - NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int commandStoreId); - NavigableMap<Timestamp, Ranges> loadSafeToRead(int commandStoreId); - CommandStores.RangesForEpoch loadRangesForEpoch(int commandStoreId); + RedundantBefore loadRedundantBefore(int store); + NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store); + NavigableMap<Timestamp, Ranges> loadSafeToRead(int store); + CommandStores.RangesForEpoch loadRangesForEpoch(int store); Persister<DurableBefore, DurableBefore> durableBeforePersister(); void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush); + class TopologyUpdate + { + public final Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores; + public final Topology local; + public final Topology global; + + public TopologyUpdate(@Nonnull Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores, @Nonnull Topology local, @Nonnull Topology global) + { + this.commandStores = commandStores; + this.local = local; + this.global = global; + } + + @Override + public boolean equals(Object object) + { + if (this == object) return true; + if (object == null || getClass() != object.getClass()) return false; + TopologyUpdate update = (TopologyUpdate) object; + return Objects.equals(commandStores, update.commandStores) && Objects.equals(local, update.local) && Objects.equals(global, update.global); + } + + @Override + public int hashCode() + { + return Objects.hash(commandStores, local, global); + } + + @Override + public String toString() + { + return "TopologyUpdate{" + + "local=" + local + + ", commandStores=" + commandStores + + ", global=" + global + + '}'; + } + } + class CommandUpdate { public final TxnId txnId; diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index b2614b72..2a8fb94d 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -140,7 +140,8 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo synchronized EpochState getOrCreate(long epoch) { - Invariants.checkArgument(epoch > 0, "Epoch must be positive but given %d", epoch); + Invariants.checkArgument(epoch >= 0, "Epoch must be non-negative but given %d", epoch); + Invariants.checkArgument(epoch > 0 || (lastReceived == 0 && epochs.isEmpty()), "Received epoch 0 after initialization. Last received %d, epochsf; %s", lastReceived, epochs); if (epochs.isEmpty()) { EpochState state = createEpochState(epoch); @@ -177,8 +178,20 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo { long epoch = topology.epoch(); logger.debug("Receiving epoch {}", epoch); - Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0, - "Epoch %d != %d + 1", epoch, lastReceived); + if (lastReceived >= epoch) + { + // If we have already seen the epoch + for (int i = epochs.size() - 1; i >= 0; i--) + { + EpochState expected = epochs.get(i); + if (epoch == expected.epoch) + { + Invariants.checkState(topology.equals(expected.topology), + "Expected existing topology to match upsert, but %s != %s", topology, expected.topology); + return; + } + } + } state = getOrCreate(epoch); state.topology = topology; lastReceived = epoch; @@ -286,7 +299,20 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo maxRequestedEpoch = epoch; } - fetchTopologyInternal(epoch); + + try + { + fetchTopologyInternal(epoch); + } + catch (Throwable t) + { + // This epoch will not be fetched, so we need to reset it back + synchronized (this) + { + maxRequestedEpoch = 0; + } + throw t; + } } // TODO (expected): rename, sync is too ambiguous @@ -325,6 +351,7 @@ public abstract class AbstractConfigurationService<EpochState extends AbstractCo epochs.acknowledgeFuture(epochs.minEpoch()).addCallback(() -> reportTopology(topology, startSync, isLoad)); return; } + if (lastAcked > 0 && topology.epoch() > lastAcked + 1) { logger.debug("Epoch {} received; waiting for {} to ack before reporting", topology.epoch(), lastAcked + 1); diff --git a/accord-core/src/main/java/accord/impl/CommandChange.java b/accord-core/src/main/java/accord/impl/CommandChange.java index e2c450d9..df74bf7e 100644 --- a/accord-core/src/main/java/accord/impl/CommandChange.java +++ b/accord-core/src/main/java/accord/impl/CommandChange.java @@ -511,6 +511,8 @@ public class CommandChange public static int getFlags(Command before, Command after) { int flags = 0; + if (before == null && after == null) + return flags; flags = collectFlags(before, after, Command::executeAt, true, EXECUTE_AT, flags); flags = collectFlags(before, after, Command::executesAtLeast, true, EXECUTES_AT_LEAST, flags); @@ -531,7 +533,7 @@ public class CommandChange // Special-cased for Journal BurnTest integration if ((before != null && after.result() != before.result()) || - (before == null && after.result() != null)) //TODO + (before == null && after.result() != null)) { flags = collectFlags(before, after, Command::writes, false, RESULT, flags); } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 223fd1cc..71df36f6 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -124,9 +124,9 @@ public abstract class InMemoryCommandStore extends CommandStore private InMemorySafeStore current; private final Journal.Loader loader; - public InMemoryCommandStore(int id, NodeCommandStoreService time, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) + public InMemoryCommandStore(int id, NodeCommandStoreService node, Agent agent, DataStore store, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, EpochUpdateHolder epochUpdateHolder) { - super(id, time, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); + super(id, node, agent, store, progressLogFactory, listenersFactory, epochUpdateHolder); this.loader = new CommandLoader(this); } diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java index e1838d69..9e659df6 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStores.java @@ -18,6 +18,7 @@ package accord.impl; +import accord.api.Journal; import accord.api.LocalListeners; import accord.local.*; import accord.api.Agent; @@ -29,30 +30,30 @@ public class InMemoryCommandStores { public static class Synchronized extends CommandStores { - public Synchronized(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public Synchronized(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(node, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new); + super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Synchronized::new); } } public static class SingleThread extends CommandStores { - public SingleThread(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public SingleThread(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(node, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new); + super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.SingleThread::new); } - public SingleThread(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) + public SingleThread(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) { - super(node, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, shardFactory); + super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, shardFactory); } } public static class Debug extends InMemoryCommandStores.SingleThread { - public Debug(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) + public Debug(NodeCommandStoreService node, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory) { - super(node, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new); + super(node, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, InMemoryCommandStore.Debug::new); } } } diff --git a/accord-core/src/main/java/accord/local/Bootstrap.java b/accord-core/src/main/java/accord/local/Bootstrap.java index 5a2012ca..c466a84a 100644 --- a/accord-core/src/main/java/accord/local/Bootstrap.java +++ b/accord-core/src/main/java/accord/local/Bootstrap.java @@ -140,14 +140,19 @@ class Bootstrap Ranges commitRanges = valid; safeStore = safeStore; // we submit a separate execution so that we know markBootstrapping is durable before we initiate the fetch - safeStore.commandStore().submit(empty(), safeStore0 -> { - store.markBootstrapping(safeStore0, globalSyncId, commitRanges); - return CoordinateSyncPoint.exclusiveSyncPoint(node, globalSyncId, commitRanges); - }).flatMap(i -> i).flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(empty(), safeStore1 -> { - if (valid.isEmpty()) // we've lost ownership of the range - return AsyncResults.success(Ranges.EMPTY); - return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); - }))).flatMap(i -> i).begin(this); + safeStore.commandStore() + .submit(empty(), safeStore0 -> { + store.markBootstrapping(safeStore0, globalSyncId, commitRanges); + return CoordinateSyncPoint.exclusiveSyncPoint(node, globalSyncId, commitRanges); + }) + .flatMap(i -> i) + .flatMap(syncPoint -> node.withEpoch(epoch, () -> store.submit(empty(), safeStore1 -> { + if (valid.isEmpty()) // we've lost ownership of the range + return AsyncResults.success(Ranges.EMPTY); + return fetch = safeStore1.dataStore().fetch(node, safeStore1, valid, syncPoint, this); + }))) + .flatMap(i -> i) + .begin(this); } // we no longer want to fetch these ranges (perhaps we no longer own them) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 4873a747..9a0efc42 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -208,6 +208,8 @@ public abstract class CommandStore implements AgentExecutor return id; } + public void restore() {}; + public abstract Journal.Loader loader(); @Override diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 83050e9c..09d90163 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -20,6 +20,7 @@ package accord.local; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory; import accord.api.Agent; import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; +import accord.api.Journal; import accord.api.LocalListeners; import accord.api.ProgressLog; import accord.api.RoutingKey; @@ -61,6 +63,7 @@ import accord.utils.MapReduceConsume; import accord.utils.RandomSource; import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResults; import accord.utils.async.Cancellable; import org.agrona.collections.Hashing; import org.agrona.collections.Int2ObjectHashMap; @@ -86,6 +89,7 @@ public abstract class CommandStores Agent agent, DataStore store, RandomSource random, + Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory); @@ -128,6 +132,17 @@ public abstract class CommandStores this.store = store; } + private ShardHolder(CommandStore store, RangesForEpoch ranges) + { + this.store = store; + this.ranges = ranges; + } + + public ShardHolder withStoreUnsafe(CommandStore store) + { + return new ShardHolder(store, ranges); + } + RangesForEpoch ranges() { return ranges; @@ -343,40 +358,96 @@ public abstract class CommandStores } } - protected static class Snapshot + // This method should only be used on node startup. + // "Unsafe" because it relies on user to synchronise and sequence the call properly. + public void restoreShardStateUnsafe(Consumer<Topology> reportTopology) + { + Iterator<Journal.TopologyUpdate> iter = journal.replayTopologies(); + // First boot + if (!iter.hasNext()) + return; + + Journal.TopologyUpdate lastUpdate = null; + while (iter.hasNext()) + { + Journal.TopologyUpdate update = iter.next(); + reportTopology.accept(update.global); + if (lastUpdate == null || update.global.epoch() > lastUpdate.global.epoch()) + lastUpdate = update; + } + + ShardHolder[] shards = new ShardHolder[lastUpdate.commandStores.size()]; + int i = 0; + for (Map.Entry<Integer, RangesForEpoch> e : lastUpdate.commandStores.entrySet()) + { + RangesForEpoch ranges = e.getValue(); + CommandStore commandStore = null; + for (ShardHolder shard : current.shards) + { + if (shard.ranges.equals(ranges)) + commandStore = shard.store; + } + Invariants.nonNull(commandStore, "Command store should have been reloaded").restore(); + ShardHolder shard = new ShardHolder(commandStore, e.getValue()); + shards[i++] = shard; + } + + loadSnapshot(new Snapshot(shards, lastUpdate.local, lastUpdate.global)); + } + + protected void loadSnapshot(Snapshot toLoad) + { + current = toLoad; + } + + protected static class Snapshot extends Journal.TopologyUpdate { public final ShardHolder[] shards; - final Int2ObjectHashMap<CommandStore> byId; - final Topology local; - final Topology global; + public final Int2ObjectHashMap<CommandStore> byId; Snapshot(ShardHolder[] shards, Topology local, Topology global) { + super(asMap(shards), local, global); this.shards = shards; this.byId = new Int2ObjectHashMap<>(shards.length, Hashing.DEFAULT_LOAD_FACTOR, true); for (ShardHolder shard : shards) byId.put(shard.store.id(), shard.store); - this.local = local; - this.global = global; + } + + // This method exists to ensure we do not hold references to command stores + public Journal.TopologyUpdate asTopologyUpdate() + { + return new Journal.TopologyUpdate(commandStores, local, global); + } + + private static Int2ObjectHashMap<CommandStores.RangesForEpoch> asMap(ShardHolder[] shards) + { + Int2ObjectHashMap<CommandStores.RangesForEpoch> commandStores = new Int2ObjectHashMap<>(); + for (ShardHolder shard : shards) + commandStores.put(shard.store.id, shard.ranges); + return commandStores; } } final StoreSupplier supplier; final ShardDistributor shardDistributor; + final Journal journal; volatile Snapshot current; int nextId; - private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor) + private CommandStores(StoreSupplier supplier, ShardDistributor shardDistributor, Journal journal) { this.supplier = supplier; this.shardDistributor = shardDistributor; + this.current = new Snapshot(new ShardHolder[0], Topology.EMPTY, Topology.EMPTY); + this.journal = journal; } - public CommandStores(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, + public CommandStores(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, Journal journal, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, CommandStore.Factory shardFactory) { - this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory), shardDistributor); + this(new StoreSupplier(time, agent, store, random, progressLogFactory, listenersFactory, shardFactory), shardDistributor, journal); } public Topology local() @@ -693,7 +764,20 @@ public abstract class CommandStores public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology, boolean startSync) { TopologyUpdate update = updateTopology(node, current, newTopology, startSync); - current = update.snapshot; + if (update.snapshot != current) + { + AsyncResults.SettableResult<Void> flush = new AsyncResults.SettableResult<>(); + journal.saveTopology(update.snapshot.asTopologyUpdate(), () -> flush.setSuccess(null)); + current = update.snapshot; + return () -> { + EpochReady ready = update.bootstrap.get(); + return new EpochReady(ready.epoch, + flush.flatMap(ignore -> ready.metadata).beginAsResult(), + flush.flatMap(ignore -> ready.coordinate).beginAsResult(), + flush.flatMap(ignore -> ready.data).beginAsResult(), + flush.flatMap(ignore -> ready.reads).beginAsResult()); + }; + } return update.bootstrap; } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index ff3f55bd..254280e0 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -43,6 +43,7 @@ import accord.api.Agent; import accord.api.ConfigurationService; import accord.api.ConfigurationService.EpochReady; import accord.api.DataStore; +import accord.api.Journal; import accord.api.LocalConfig; import accord.api.LocalListeners; import accord.api.MessageSink; @@ -184,7 +185,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ Function<Node, RemoteListeners> remoteListenersFactory, Function<Node, Timeouts> requestTimeoutsFactory, Function<Node, ProgressLog.Factory> progressLogFactory, Function<Node, LocalListeners.Factory> localListenersFactory, CommandStores.Factory factory, CoordinationAdapter.Factory coordinationAdapters, Persister<DurableBefore, DurableBefore> durableBeforePersister, - LocalConfig localConfig) + LocalConfig localConfig, Journal journal) { this.id = id; this.scheduler = scheduler; // we set scheduler first so that e.g. requestTimeoutsFactory and progressLogFactory can take references to it @@ -201,7 +202,7 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ this.agent = agent; this.random = random; this.persistDurableBefore = new PersistentField<>(() -> durableBefore, DurableBefore::merge, durableBeforePersister, this::setPersistedDurableBefore); - this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); + this.commandStores = factory.create(this, agent, dataSupplier.get(), random.fork(), journal, shardDistributor, progressLogFactory.apply(this), localListenersFactory.apply(this)); this.durabilityScheduling = new DurabilityScheduling(this); // TODO (desired): make frequency configurable scheduler.recurring(() -> commandStores.forEachCommandStore(store -> store.progressLog.maybeNotify()), 1, SECONDS); @@ -355,9 +356,9 @@ public class Node implements ConfigurationService.Listener, NodeCommandStoreServ } @Override - public void onRemoveNodes(long epoch, Collection<Id> removed) + public void onRemoveNode(long epoch, Id removed) { - topology.onRemoveNodes(epoch, removed); + topology.onRemoveNode(epoch, removed); } @Override diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 933280c4..6e66be24 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -20,7 +20,6 @@ package accord.topology; import java.util.ArrayList; import java.util.BitSet; -import java.util.Collection; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -554,14 +553,14 @@ public class TopologyManager epochs.syncComplete(node, epoch); } - public synchronized void onRemoveNodes(long removedIn, Collection<Id> removed) + // TODO (now, correctness): it seems to be wrong to count removed nodes towards sync quorums. + public synchronized void onRemoveNode(long removedIn, Id removed) { for (long epoch = removedIn, min = minEpoch(); epoch >= min; epoch--) { EpochState state = epochs.get(epoch); if (state == null || state.hasReachedQuorum()) continue; - for (Id node : removed) - epochs.syncComplete(node, epoch); + epochs.syncComplete(removed, epoch); } } diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index b2b04cae..7d188504 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -44,6 +44,7 @@ import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; +import accord.impl.basic.InMemoryJournal; import accord.impl.list.ListQuery; import accord.impl.list.ListRead; import accord.impl.list.ListUpdate; @@ -199,7 +200,8 @@ public class Utils InMemoryCommandStores.Synchronized::new, new CoordinationAdapter.DefaultFactory(), DurableBefore.NOOP_PERSISTER, - localConfig); + localConfig, + new InMemoryJournal(nodeId, agent)); awaitUninterruptibly(node.unsafeStart()); return node; } diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index bc5c4247..c8b65168 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -443,8 +443,7 @@ public class Cluster () -> queue, (id, onStale) -> globalExecutor.withAgent(agentSupplier.apply(onStale)), queue::checkFailures, - ignore -> { - }, + ignore -> {}, randomSupplier, timeServiceSupplier, topologyFactory, @@ -675,8 +674,8 @@ public class Cluster () -> new ListStore(scheduler, random, id), new ShardDistributor.EvenSplit<>(8, ignore -> new PrefixedIntHashKey.Splitter()), nodeExecutor.agent(), randomSupplier.get(), scheduler, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultTimeouts::new, - DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, cacheLoading, journal), new CoordinationAdapter.DefaultFactory(), - DurableBefore.NOOP_PERSISTER, localConfig); + DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, DelayedCommandStores.factory(sinks.pending, cacheLoading), new CoordinationAdapter.DefaultFactory(), + DurableBefore.NOOP_PERSISTER, localConfig, journal); DurabilityScheduling durability = node.durabilityScheduling(); // TODO (desired): randomise durability.setShardCycleTime(30, SECONDS); @@ -729,36 +728,38 @@ public class Cluster Id id = pickNodeNotBootstrapping(random, nodesList, nodeMap); if (id == null) return; - CommandStore[] stores = nodeMap.get(id).commandStores().all(); - - ((DelayedCommandStore)stores[0]).unsafeRunIn(() -> { - Predicate<Pending> pred = getPendingPredicate(id.id, stores); - while (sinks.drain(pred)); - - // Journal cleanup is a rough equivalent of a node restart. - trace.debug("Triggering journal cleanup for node " + id); - CommandsForKey.disableLinearizabilityViolationsReporting(); - ListStore listStore = (ListStore) nodeMap.get(id).commandStores().dataStore(); - NavigableMap<RoutableKey, Timestamped<int[]>> prevData = listStore.copyOfCurrentData(); - listStore.clear(); - listStore.restoreFromSnapshot(); - // we are simulating node restart, so its remote listeners will also be gone - ((DefaultRemoteListeners)nodeMap.get(id).remoteListeners()).clear(); - Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores); - for (CommandStore s : stores) - { - DelayedCommandStores.DelayedCommandStore store = (DelayedCommandStores.DelayedCommandStore) s; - store.clearForTesting(); - } - Journal journal = journalMap.get(id); - journal.replay(nodeMap.get(id).commandStores()); - while (sinks.drain(pred)); - CommandsForKey.enableLinearizabilityViolationsReporting(); - verifyConsistentRestore(beforeStores, stores); - // we can get ahead of prior state by executing further if we skip some earlier phase's dependencies - listStore.checkAtLeast(stores, prevData); - trace.debug("Done with cleanup."); - }); + + CommandStores stores = nodeMap.get(id).commandStores(); + while (sinks.drain(getPendingPredicate(id.id, stores.all()))) ; + + trace.debug("Triggering store cleanup and journal replay for node " + id); + CommandsForKey.disableLinearizabilityViolationsReporting(); + + // Clean data and restore from snapshot + ListStore listStore = (ListStore) nodeMap.get(id).commandStores().dataStore(); + NavigableMap<RoutableKey, Timestamped<int[]>> prevData = listStore.copyOfCurrentData(); + listStore.clear(); + listStore.restoreFromSnapshot(); + + // We are simulating node restart, so its remote listeners will also be gone + ((DefaultRemoteListeners) nodeMap.get(id).remoteListeners()).clear(); + Int2ObjectHashMap<NavigableMap<TxnId, Command>> beforeStores = copyCommands(stores.all()); + + // Re-create all command stores + nodeMap.get(id).commandStores().restoreShardStateUnsafe(t -> {}); + stores = nodeMap.get(id).commandStores(); + + // Replay journal + Journal journal = journalMap.get(id); + journal.replay(stores); + + // Re-enable safety checks + while (sinks.drain(getPendingPredicate(id.id, stores.all()))) ; + CommandsForKey.enableLinearizabilityViolationsReporting(); + verifyConsistentRestore(beforeStores, stores.all()); + // we can get ahead of prior state by executing further if we skip some earlier phase's dependencies + listStore.checkAtLeast(stores, prevData); + trace.debug("Done with replay."); }, () -> random.nextInt(10, 30), SECONDS); durabilityScheduling.forEach(DurabilityScheduling::start); diff --git a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java index e9b3f735..2e034aca 100644 --- a/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java +++ b/accord-core/src/test/java/accord/impl/basic/DelayedCommandStores.java @@ -71,12 +71,12 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread { private DelayedCommandStores(NodeCommandStoreService time, Agent agent, DataStore store, RandomSource random, ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory, LocalListeners.Factory listenersFactory, SimulatedDelayedExecutorService executorService, CacheLoading isLoadedCheck, Journal journal) { - super(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal)); + super(time, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory, DelayedCommandStore.factory(executorService, isLoadedCheck, journal)); } - public static CommandStores.Factory factory(PendingQueue pending, CacheLoading isLoadedCheck, Journal journal) + public static CommandStores.Factory factory(PendingQueue pending, CacheLoading isLoadedCheck) { - return (time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory) -> + return (time, agent, store, random, journal, shardDistributor, progressLogFactory, listenersFactory) -> new DelayedCommandStores(time, agent, store, random, shardDistributor, progressLogFactory, listenersFactory, new SimulatedDelayedExecutorService(pending, agent), isLoadedCheck, journal); } @@ -91,6 +91,42 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread return contains(previous, prefix); } + protected void loadSnapshot(Snapshot nextSnapshot) + { + Snapshot current = current(); + // These checks are only applicable to delayed command stores. + for (Integer id : current.byId.keySet()) + { + CommandStore prev = current.byId.get(id); + CommandStore next = nextSnapshot.byId.get(id); + { + RedundantBefore orig = prev.unsafeGetRedundantBefore(); + RedundantBefore loaded = next.unsafeGetRedundantBefore(); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + NavigableMap<TxnId, Ranges> orig = prev.unsafeGetBootstrapBeganAt(); + NavigableMap<TxnId, Ranges> loaded = next.unsafeGetBootstrapBeganAt(); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + NavigableMap<Timestamp, Ranges> orig = prev.unsafeGetSafeToRead(); + NavigableMap<Timestamp, Ranges> loaded = next.unsafeGetSafeToRead(); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + + { + RangesForEpoch orig = prev.unsafeGetRangesForEpoch(); + RangesForEpoch loaded = next.unsafeGetRangesForEpoch(); + Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); + } + } + + super.loadSnapshot(nextSnapshot); + } + private static boolean contains(Topology previous, int searchPrefix) { for (Range range : previous.ranges()) @@ -139,38 +175,39 @@ public class DelayedCommandStores extends InMemoryCommandStores.SingleThread this.executor = executor; this.cacheLoading = cacheLoading; this.journal = journal; + restore(); } - @Override - public void clearForTesting() + protected void loadRedundantBefore(RedundantBefore redundantBefore) { - super.clearForTesting(); - - // Rather than cleaning up and reloading, we can just assert equality during reload - { - RedundantBefore orig = unsafeGetRedundantBefore(); - RedundantBefore loaded = journal.loadRedundantBefore(id()); - Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); - } + if (redundantBefore != null) + unsafeSetRedundantBefore(redundantBefore); + } - { - NavigableMap<TxnId, Ranges> orig = unsafeGetBootstrapBeganAt(); - NavigableMap<TxnId, Ranges> loaded = journal.loadBootstrapBeganAt(id()); - Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); - } + protected void loadBootstrapBeganAt(NavigableMap<TxnId, Ranges> bootstrapBeganAt) + { + if (bootstrapBeganAt != null) + unsafeSetBootstrapBeganAt(bootstrapBeganAt); + } - { - NavigableMap<Timestamp, Ranges> orig = unsafeGetSafeToRead(); - NavigableMap<Timestamp, Ranges> loaded = journal.loadSafeToRead(id()); - Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); - } + protected void loadSafeToRead(NavigableMap<Timestamp, Ranges> safeToRead) + { + if (safeToRead != null) + unsafeSetSafeToRead(safeToRead); + } - { - RangesForEpoch orig = unsafeGetRangesForEpoch(); - RangesForEpoch loaded = journal.loadRangesForEpoch(id()); + protected void loadRangesForEpoch(CommandStores.RangesForEpoch rangesForEpoch) + { + if (rangesForEpoch != null) + unsafeSetRangesForEpoch(rangesForEpoch); + } - Invariants.checkState(orig.equals(loaded), "%s should equal %s", loaded, orig); - } + public void restore() + { + loadRedundantBefore(journal.loadRedundantBefore(id())); + loadBootstrapBeganAt(journal.loadBootstrapBeganAt(id())); + loadSafeToRead(journal.loadSafeToRead(id())); + loadRangesForEpoch(journal.loadRangesForEpoch(id())); } @Override diff --git a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java index 59f16268..bc2824e5 100644 --- a/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/InMemoryJournal.java @@ -21,6 +21,7 @@ package accord.impl.basic; import java.util.AbstractList; import java.util.ArrayList; import java.util.EnumMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -91,6 +92,7 @@ import static accord.utils.Invariants.illegalState; public class InMemoryJournal implements Journal { private final Int2ObjectHashMap<NavigableMap<TxnId, List<Diff>>> diffsPerCommandStore = new Int2ObjectHashMap<>(); + private final List<TopologyUpdate> topologyUpdates = new ArrayList<>(); private final Int2ObjectHashMap<FieldUpdates> fieldStates = new Int2ObjectHashMap<>(); private final Node.Id id; @@ -101,6 +103,7 @@ public class InMemoryJournal implements Journal this.id = id; this.agent = agent; } + @Override public Command loadCommand(int commandStoreId, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { @@ -190,6 +193,44 @@ public class InMemoryJournal implements Journal onFlush.run(); } + @Override + public Iterator<TopologyUpdate> replayTopologies() + { + return new Iterator<>() + { + int current = 0; + public boolean hasNext() + { + return current < topologyUpdates.size(); + } + + public TopologyUpdate next() + { + return topologyUpdates.get(current++); + } + }; + } + + @Override + public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) + { + topologyUpdates.add(topologyUpdate); + if (onFlush != null) + onFlush.run(); + } + + public void truncateTopologiesForTesting(long minEpoch) + { + List<TopologyUpdate> next = new ArrayList<>(); + for (int i = 0; i < topologyUpdates.size(); i++) + { + TopologyUpdate update = topologyUpdates.get(i); + if (update.global.epoch() >= minEpoch) + next.add(update); + } + topologyUpdates.retainAll(next); + } + @Override public RedundantBefore loadRedundantBefore(int commandStoreId) { diff --git a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java index b40134a3..564fe00b 100644 --- a/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java +++ b/accord-core/src/test/java/accord/impl/basic/LoggingJournal.java @@ -24,6 +24,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; +import java.util.Iterator; import java.util.NavigableMap; import accord.api.Journal; @@ -90,6 +91,22 @@ public class LoggingJournal implements Journal delegate.saveCommand(store, update, onFlush); } + @Override + public Iterator<TopologyUpdate> replayTopologies() + { + log("REPLAY TOPOLOGIES\n"); + return delegate.replayTopologies(); + } + + @Override + public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) + { + log("%d: %s\n", topologyUpdate); + if (onFlush != null) + onFlush.run(); + throw new IllegalArgumentException(); + } + @Override public void purge(CommandStores commandStores) { diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index c4bfa4d4..7953ce48 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -46,6 +46,7 @@ import accord.coordinate.Truncated; import accord.coordinate.tracking.AllTracker; import accord.impl.basic.SimulatedFault; import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.SafeCommandStore; import accord.primitives.Range; @@ -518,12 +519,12 @@ public class ListStore implements DataStore return new TreeMap<>(data); } - public void checkAtLeast(CommandStore[] commandStores, NavigableMap<RoutableKey, Timestamped<int[]>> a) + public void checkAtLeast(CommandStores commandStores, NavigableMap<RoutableKey, Timestamped<int[]>> a) { checkAtLeast(commandStores, a, data); } - public static void checkAtLeast(CommandStore[] commandStores, NavigableMap<RoutableKey, Timestamped<int[]>> a, NavigableMap<RoutableKey, Timestamped<int[]>> b) + public static void checkAtLeast(CommandStores commandStores, NavigableMap<RoutableKey, Timestamped<int[]>> a, NavigableMap<RoutableKey, Timestamped<int[]>> b) { if (a.isEmpty()) return; @@ -534,7 +535,7 @@ public class ListStore implements DataStore Timestamped<int[]> bv = b.get(k); if (bv == null || bv.timestamp.compareTo(av.timestamp) < 0) { - for (CommandStore commandStore : commandStores) + for (CommandStore commandStore : commandStores.all()) { if (!commandStore.unsafeGetRangesForEpoch().allSince(av.timestamp.epoch()).contains(k)) continue; diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index 39c7b6de..a13c7deb 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.NetworkFilter; +import accord.api.Agent; +import accord.api.Journal; import accord.api.MessageSink; import accord.api.LocalConfig; import accord.coordinate.CoordinationAdapter; @@ -40,6 +42,7 @@ import accord.impl.DefaultTimeouts; import accord.impl.InMemoryCommandStores; import accord.impl.IntKey; import accord.impl.DefaultLocalListeners; +import accord.impl.basic.InMemoryJournal; import accord.impl.progresslog.DefaultProgressLogs; import accord.impl.DefaultRemoteListeners; import accord.impl.SizeOfIntersectionSorter; @@ -130,13 +133,15 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> MessageSink messageSink = messageSinkFactory.apply(id, this); MockConfigurationService configurationService = new MockConfigurationService(messageSink, onFetchTopology, topology); LocalConfig localConfig = LocalConfig.DEFAULT; + Agent agent = new TestAgent(); + Journal journal = new InMemoryJournal(id, agent); Node node = new Node(id, messageSink, configurationService, time, () -> store, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), - new TestAgent(), + agent, random.fork(), new ThreadPoolScheduler(), SizeOfIntersectionSorter.SUPPLIER, @@ -147,7 +152,8 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> InMemoryCommandStores.SingleThread::new, new CoordinationAdapter.DefaultFactory(), DurableBefore.NOOP_PERSISTER, - localConfig); + localConfig, + journal); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(topology, false, true); return node; diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index 58cb6cbb..a9746ed2 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import accord.api.Agent; import accord.api.LocalConfig; import accord.api.ProgressLog.NoOpProgressLog; import accord.api.RoutingKey; @@ -43,6 +44,7 @@ import accord.impl.SizeOfIntersectionSorter; import accord.impl.TestAgent; import accord.impl.TestAgent.RethrowAgent; import accord.impl.TopologyFactory; +import accord.impl.basic.InMemoryJournal; import accord.impl.mock.MockCluster; import accord.impl.mock.MockConfigurationService; import accord.impl.mock.MockStore; @@ -101,14 +103,14 @@ public class ImmutableCommandTest { MockCluster.Clock clock = new MockCluster.Clock(100); LocalConfig localConfig = LocalConfig.DEFAULT; - Node node = new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), - clock, - () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED, + Agent agent = new TestAgent(); + Node node = new Node(id, null, new MockConfigurationService(null, (epoch, service) -> { }, storeSupport.local.get()), clock, + () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), agent, new DefaultRandom(), Scheduler.NEVER_RUN_SCHEDULED, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultTimeouts::new, ignore -> ignore2 -> new NoOpProgressLog(), DefaultLocalListeners.Factory::new, InMemoryCommandStores.Synchronized::new, new CoordinationAdapter.DefaultFactory(), DurableBefore.NOOP_PERSISTER, - localConfig); + localConfig, new InMemoryJournal(id, agent)); awaitUninterruptibly(node.unsafeStart()); node.onTopologyUpdate(storeSupport.local.get(), false, true); return node; diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index 8fb34489..210d0ae3 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -29,9 +29,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -39,6 +41,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import accord.api.Journal; import accord.api.MessageSink; import accord.api.Scheduler; import accord.api.LocalConfig; @@ -50,9 +53,12 @@ import accord.impl.progresslog.DefaultProgressLogs; import accord.impl.DefaultRemoteListeners; import accord.impl.SizeOfIntersectionSorter; import accord.local.AgentExecutor; +import accord.local.Command; +import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.Node; import accord.local.Node.Id; +import accord.local.RedundantBefore; import accord.local.ShardDistributor; import accord.local.TimeService; import accord.messages.Callback; @@ -61,7 +67,11 @@ import accord.messages.Reply.FailureReply; import accord.messages.ReplyContext; import accord.messages.Request; import accord.messages.SafeCallback; +import accord.primitives.Ranges; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; import accord.topology.Topology; +import accord.utils.PersistentField; import accord.utils.RandomSource; import accord.utils.async.AsyncChains; import accord.utils.async.AsyncResult; @@ -347,7 +357,8 @@ public class Cluster implements Scheduler MaelstromAgent.INSTANCE, randomSupplier.get(), sinks, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, InMemoryCommandStores.SingleThread::new, new CoordinationAdapter.DefaultFactory(), - DurableBefore.NOOP_PERSISTER, localConfig)); + DurableBefore.NOOP_PERSISTER, localConfig, + new NoOpJournal())); } AsyncResult<?> startup = AsyncChains.reduce(lookup.values().stream().map(Node::unsafeStart).collect(toList()), (a, b) -> null).beginAsResult(); @@ -373,4 +384,21 @@ public class Cluster implements Scheduler lookup.values().forEach(Node::shutdown); } } -} + + public static class NoOpJournal implements Journal + { + @Override public Command loadCommand(int store, TxnId txnId, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } + @Override public Command.Minimal loadMinimal(int store, TxnId txnId, Load load, RedundantBefore redundantBefore, DurableBefore durableBefore) { throw new IllegalStateException("Not impelemented"); } + @Override public void saveCommand(int store, CommandUpdate value, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } + @Override public Iterator<TopologyUpdate> replayTopologies() { throw new IllegalStateException("Not impelemented"); } + @Override public void saveTopology(TopologyUpdate topologyUpdate, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } + @Override public void purge(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } + @Override public void replay(CommandStores commandStores) { throw new IllegalStateException("Not impelemented"); } + @Override public RedundantBefore loadRedundantBefore(int store) { throw new IllegalStateException("Not impelemented"); } + @Override public NavigableMap<TxnId, Ranges> loadBootstrapBeganAt(int store) { throw new IllegalStateException("Not impelemented"); } + @Override public NavigableMap<Timestamp, Ranges> loadSafeToRead(int store) { throw new IllegalStateException("Not impelemented"); } + @Override public CommandStores.RangesForEpoch loadRangesForEpoch(int store) { throw new IllegalStateException("Not impelemented"); } + @Override public PersistentField.Persister<DurableBefore, DurableBefore> durableBeforePersister() { throw new IllegalStateException("Not impelemented"); } + @Override public void saveStoreState(int store, FieldUpdates fieldUpdates, Runnable onFlush) { throw new IllegalStateException("Not impelemented"); } + } +} \ No newline at end of file diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 306313e1..122d1a4d 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.function.Supplier; +import accord.api.Journal; import accord.api.MessageSink; import accord.api.Scheduler; import accord.api.LocalConfig; @@ -179,13 +180,14 @@ public class Main topology = topologyFactory.toTopology(init.cluster); sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); LocalConfig localConfig = LocalConfig.DEFAULT; + Journal journal = new Cluster.NoOpJournal(); on = new Node(init.self, sink, new SimpleConfigService(topology), TimeService.ofNonMonotonic(System::currentTimeMillis, TimeUnit.MILLISECONDS), MaelstromStore::new, new ShardDistributor.EvenSplit(8, ignore -> new MaelstromKey.Splitter()), MaelstromAgent.INSTANCE, new DefaultRandom(), scheduler, SizeOfIntersectionSorter.SUPPLIER, DefaultRemoteListeners::new, DefaultTimeouts::new, DefaultProgressLogs::new, DefaultLocalListeners.Factory::new, InMemoryCommandStores.SingleThread::new, new CoordinationAdapter.DefaultFactory(), - DurableBefore.NOOP_PERSISTER, localConfig); + DurableBefore.NOOP_PERSISTER, localConfig, journal); awaitUninterruptibly(on.unsafeStart()); err.println("Initialized node " + init.self); err.flush(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org