This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 03f937175dbcf04243bb0ac48b64746c1a07bc9c Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Tue Jun 20 16:04:33 2023 -0700 Accord/C*/TCM bootstrap integration Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-18444 --- .../main/java/accord/api/ConfigurationService.java | 13 +- .../java/accord/coordinate/FetchCoordinator.java | 1 + .../accord/impl/AbstractConfigurationService.java | 173 +++++++++---- .../java/accord/impl/AbstractFetchCoordinator.java | 281 +++++++++++++++++++++ .../java/accord/impl/InMemoryCommandStore.java | 15 -- .../src/main/java/accord/local/CommandStore.java | 32 ++- .../src/main/java/accord/local/CommandStores.java | 14 +- accord-core/src/main/java/accord/local/Node.java | 13 +- .../src/main/java/accord/messages/MessageType.java | 4 +- .../main/java/accord/messages/WaitAndReadData.java | 12 +- .../src/main/java/accord/topology/Topology.java | 8 + .../main/java/accord/topology/TopologyManager.java | 29 ++- .../java/accord/utils/ReducingIntervalMap.java | 27 +- .../main/java/accord/utils/ReducingRangeMap.java | 8 + .../main/java/accord/utils/async/AsyncChains.java | 12 + .../accord/api/TestableConfigurationService.java | 3 +- .../accord/burn/BurnTestConfigurationService.java | 10 +- .../src/test/java/accord/burn/TopologyUpdates.java | 2 +- .../java/accord/coordinate/TopologyChangeTest.java | 3 +- .../impl/AbstractConfigurationServiceTest.java | 10 +- .../accord/impl/list/ListFetchCoordinator.java | 69 +++++ .../src/test/java/accord/impl/list/ListStore.java | 245 +----------------- .../test/java/accord/impl/mock/MockCluster.java | 2 +- .../accord/impl/mock/MockConfigurationService.java | 19 +- .../java/accord/local/ImmutableCommandTest.java | 2 +- .../java/accord/topology/TopologyManagerTest.java | 32 ++- 26 files changed, 665 insertions(+), 374 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index f7ecfece..177094be 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -18,6 +18,8 @@ package accord.api; +import javax.annotation.Nullable; + import accord.local.Node; import accord.topology.Topology; import accord.utils.async.AsyncResult; @@ -50,7 +52,7 @@ import accord.utils.async.AsyncResults; * nodes that this node has synced data for the previous epoch. * * - ConfigurationService will notify the node when other nodes complete syncing an epoch by calling - * {@link accord.api.ConfigurationService.Listener#onEpochSyncComplete(accord.local.Node.Id, long)} + * {@link accord.api.ConfigurationService.Listener#onRemoteSyncComplete(accord.local.Node.Id, long)} * */ public interface ConfigurationService @@ -111,13 +113,13 @@ public interface ConfigurationService * * TODO (required): document what this Future represents, or maybe refactor it away - only used for testing */ - AsyncResult<Void> onTopologyUpdate(Topology topology); + AsyncResult<Void> onTopologyUpdate(Topology topology, boolean startSync); /** * Called when accord data associated with a superseded epoch has been sync'd from previous replicas. * This should be invoked on each replica once EpochReady.coordination has returned on a replica. */ - void onEpochSyncComplete(Node.Id node, long epoch); + void onRemoteSyncComplete(Node.Id node, long epoch); /** * Called when the configuration service is meant to truncate it's topology data up to (but not including) @@ -138,7 +140,10 @@ public interface ConfigurationService return currentTopology().epoch(); } - Topology getTopologyForEpoch(long epoch); + /** + * Returns the topology for the given epoch if it's available, null otherwise + */ + @Nullable Topology getTopologyForEpoch(long epoch); /** * Method for reporting epochs the configuration service may not be aware of. To be notified when the new epoch diff --git a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java index ab903bc8..60d0bba1 100644 --- a/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java +++ b/accord-core/src/main/java/accord/coordinate/FetchCoordinator.java @@ -218,6 +218,7 @@ public abstract class FetchCoordinator // it must only ensure needed.isEmpty() (if possible) protected Ranges trySendMore(List<State> states, Ranges needed) { + // TODO (soon, required) : need to correctly handle the cluster having fewer nodes than replication factor for (State state : states) { Ranges contact = state.uncontacted.slice(needed, Minimal); diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java index 6d2aaf39..fc77187a 100644 --- a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -30,28 +30,32 @@ import accord.api.ConfigurationService; import accord.local.Node; import accord.topology.Topology; import accord.utils.Invariants; +import accord.utils.async.AsyncChain; import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; -public abstract class AbstractConfigurationService implements ConfigurationService +public abstract class AbstractConfigurationService<EpochState extends AbstractConfigurationService.AbstractEpochState, + EpochHistory extends AbstractConfigurationService.AbstractEpochHistory<EpochState>> + implements ConfigurationService { private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); - protected final Node.Id node; + protected final Node.Id localId; - protected final EpochHistory epochs = new EpochHistory(); + protected final EpochHistory epochs = createEpochHistory(); protected final List<Listener> listeners = new ArrayList<>(); - static class EpochState + public abstract static class AbstractEpochState { - private final long epoch; - private final AsyncResult.Settable<Topology> received = AsyncResults.settable(); - private final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected final long epoch; + protected final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + protected final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + protected AsyncResult<Void> reads = null; - private Topology topology = null; + protected Topology topology = null; - public EpochState(long epoch) + public AbstractEpochState(long epoch) { this.epoch = epoch; } @@ -68,21 +72,26 @@ public abstract class AbstractConfigurationService implements ConfigurationServi } } + /** + * Access needs to be synchronized by the parent ConfigurationService class + */ @VisibleForTesting - protected static class EpochHistory + public abstract static class AbstractEpochHistory<EpochState extends AbstractEpochState> { // TODO (low priority): move pendingEpochs / FetchTopology into here? private List<EpochState> epochs = new ArrayList<>(); protected long lastReceived = 0; - private long lastAcknowledged = 0; + protected long lastAcknowledged = 0; - long minEpoch() + protected abstract EpochState createEpochState(long epoch); + + public long minEpoch() { return epochs.isEmpty() ? 0L : epochs.get(0).epoch; } - long maxEpoch() + public long maxEpoch() { int size = epochs.size(); return size == 0 ? 0L : epochs.get(size - 1).epoch; @@ -102,10 +111,10 @@ public abstract class AbstractConfigurationService implements ConfigurationServi EpochState getOrCreate(long epoch) { - Invariants.checkArgument(epoch > 0); + Invariants.checkArgument(epoch > 0, "Epoch must be positive but given %d", epoch); if (epochs.isEmpty()) { - EpochState state = new EpochState(epoch); + EpochState state = createEpochState(epoch); epochs.add(state); return state; } @@ -116,34 +125,31 @@ public abstract class AbstractConfigurationService implements ConfigurationServi int prepend = Ints.checkedCast(minEpoch - epoch); List<EpochState> next = new ArrayList<>(epochs.size() + prepend); for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) - next.add(new EpochState(addEpoch)); + next.add(createEpochState(addEpoch)); next.addAll(epochs); epochs = next; minEpoch = minEpoch(); - Invariants.checkState(minEpoch == epoch); + Invariants.checkState(minEpoch == epoch, "Epoch %d != %d", epoch, minEpoch); } long maxEpoch = maxEpoch(); int idx = Ints.checkedCast(epoch - minEpoch); // add any missing epochs for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) - epochs.add(new EpochState(addEpoch)); + epochs.add(createEpochState(addEpoch)); return epochs.get(idx); } - public EpochHistory receive(Topology topology) + public void receive(Topology topology) { long epoch = topology.epoch(); - Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0, + "Epoch %d != %d + 1", epoch, lastReceived); lastReceived = epoch; EpochState state = getOrCreate(epoch); - if (state != null) - { - state.topology = topology; - state.received.setSuccess(topology); - } - return this; + state.topology = topology; + state.received.setSuccess(topology); } AsyncResult<Topology> receiveFuture(long epoch) @@ -156,12 +162,16 @@ public abstract class AbstractConfigurationService implements ConfigurationServi return getOrCreate(epoch).topology; } - public EpochHistory acknowledge(long epoch) + public void acknowledge(EpochReady ready) { - Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + long epoch = ready.epoch; + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0, + "Epoch %d != %d + 1", epoch, lastAcknowledged); lastAcknowledged = epoch; - getOrCreate(epoch).acknowledged.setSuccess(null); - return this; + EpochState state = getOrCreate(epoch); + Invariants.checkState(state.reads == null, "Reads result was already set for epoch", epoch); + state.reads = ready.reads; + state.acknowledged.setSuccess(null); } AsyncResult<Void> acknowledgeFuture(long epoch) @@ -171,19 +181,26 @@ public abstract class AbstractConfigurationService implements ConfigurationServi void truncateUntil(long epoch) { - Invariants.checkArgument(epoch <= maxEpoch()); + Invariants.checkArgument(epoch <= maxEpoch(), "epoch %d > %d", epoch, maxEpoch()); long minEpoch = minEpoch(); int toTrim = Ints.checkedCast(epoch - minEpoch); - if (toTrim <=0) + if (toTrim <= 0) return; epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); } } - public AbstractConfigurationService(Node.Id node) + public AbstractConfigurationService(Node.Id localId) { - this.node = node; + this.localId = localId; + } + + protected abstract EpochHistory createEpochHistory(); + + protected synchronized EpochState getOrCreateEpochState(long epoch) + { + return epochs.getOrCreate(epoch); } @Override @@ -215,54 +232,68 @@ public abstract class AbstractConfigurationService implements ConfigurationServi fetchTopologyInternal(epoch); } - protected abstract void epochSyncComplete(Topology topology ); + protected abstract void localSyncComplete(Topology topology); @Override - public synchronized void acknowledgeEpoch(EpochReady ready) + public void acknowledgeEpoch(EpochReady ready) { - ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch)); - ready.coordination.addCallback(() -> epochSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + ready.metadata.addCallback(() -> { + synchronized (AbstractConfigurationService.this) + { + epochs.acknowledge(ready); + } + }); + ready.coordination.addCallback(() -> { + synchronized (AbstractConfigurationService.this) + { + localSyncComplete(epochs.getOrCreate(ready.epoch).topology); + } + }); } protected void topologyUpdatePreListenerNotify(Topology topology) {} protected void topologyUpdatePostListenerNotify(Topology topology) {} - public synchronized AsyncResult<Void> reportTopology(Topology topology) + public synchronized void reportTopology(Topology topology, boolean startSync) { long lastReceived = epochs.lastReceived; if (topology.epoch() <= lastReceived) - return AsyncResults.success(null); + return; if (lastReceived > 0 && topology.epoch() > lastReceived + 1) { fetchTopologyForEpoch(lastReceived + 1); - epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology)); - return AsyncResults.success(null); + epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology, startSync)); + return; } long lastAcked = epochs.lastAcknowledged; if (lastAcked > 0 && topology.epoch() > lastAcked + 1) { - epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology)); - return AsyncResults.success(null); + epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology, startSync)); + return; } - logger.trace("Epoch {} received by {}", topology.epoch(), node); + logger.trace("Epoch {} received by {}", topology.epoch(), localId); epochs.receive(topology); topologyUpdatePreListenerNotify(topology); for (Listener listener : listeners) - listener.onTopologyUpdate(topology); + listener.onTopologyUpdate(topology, startSync); topologyUpdatePostListenerNotify(topology); - return AsyncResults.success(null); } - protected void epochSyncCompletePreListenerNotify(Node.Id node, long epoch) {} + public synchronized void reportTopology(Topology topology) + { + reportTopology(topology, true); + } + + protected void remoteSyncCompletePreListenerNotify(Node.Id node, long epoch) {} - public synchronized void epochSyncComplete(Node.Id node, long epoch) + public synchronized void remoteSyncComplete(Node.Id node, long epoch) { - epochSyncCompletePreListenerNotify(node, epoch); + remoteSyncCompletePreListenerNotify(node, epoch); for (Listener listener : listeners) - listener.onEpochSyncComplete(node, epoch); + listener.onRemoteSyncComplete(node, epoch); } protected void truncateTopologiesPreListenerNotify(long epoch) {} @@ -276,4 +307,44 @@ public abstract class AbstractConfigurationService implements ConfigurationServi truncateTopologiesPostListenerNotify(epoch); epochs.truncateUntil(epoch); } + + public synchronized AsyncChain<Void> epochReady(long epoch) + { + EpochState state = epochs.getOrCreate(epoch); + if (state.reads != null) + return state.reads; + + return state.acknowledged.flatMap(r -> state.reads); + } + + public abstract static class Minimal extends AbstractConfigurationService<Minimal.EpochState, Minimal.EpochHistory> + { + static class EpochState extends AbstractEpochState + { + public EpochState(long epoch) + { + super(epoch); + } + } + + static class EpochHistory extends AbstractEpochHistory<EpochState> + { + @Override + protected EpochState createEpochState(long epoch) + { + return new EpochState(epoch); + } + } + + public Minimal(Node.Id node) + { + super(node); + } + + @Override + protected EpochHistory createEpochHistory() + { + return new EpochHistory(); + } + } } diff --git a/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java new file mode 100644 index 00000000..283611e7 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/AbstractFetchCoordinator.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.Data; +import accord.api.DataStore; +import accord.coordinate.FetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.Status; +import accord.messages.Callback; +import accord.messages.MessageType; +import accord.messages.ReadData; +import accord.messages.WaitAndReadData; +import accord.primitives.PartialDeps; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import accord.utils.async.AsyncChains; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +import static accord.primitives.Routables.Slice.Minimal; + +public abstract class AbstractFetchCoordinator extends FetchCoordinator +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractFetchCoordinator.class); + + static class FetchResult extends AsyncResults.SettableResult<Ranges> implements DataStore.FetchResult + { + final AbstractFetchCoordinator coordinator; + + FetchResult(AbstractFetchCoordinator coordinator) + { + this.coordinator = coordinator; + } + + @Override + public void abort(Ranges abort) + { + coordinator.abort(abort); + } + } + + static class Key + { + final Node.Id id; + final Ranges ranges; + + Key(Node.Id id, Ranges ranges) + { + this.id = id; + this.ranges = ranges; + } + + @Override + public int hashCode() + { + return (31 + id.hashCode()) * 31 + ranges.hashCode(); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) return true; + if (!(obj instanceof Key)) return false; + Key that = (Key) obj; + return id.equals(that.id) && ranges.equals(that.ranges); + } + } + + final DataStore.FetchRanges fetchRanges; + final CommandStore commandStore; + final Map<Key, DataStore.StartingRangeFetch> inflight = new HashMap<>(); + final FetchResult result = new FetchResult(this); + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + protected AbstractFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore) + { + super(node, ranges, syncPoint, fetchRanges); + this.fetchRanges = fetchRanges; + this.commandStore = commandStore; + } + + public CommandStore commandStore() + { + return commandStore; + } + + protected abstract PartialTxn rangeReadTxn(Ranges ranges); + + protected abstract void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges ranges); + + @Override + public void contact(Node.Id to, Ranges ranges) + { + Key key = new Key(to, ranges); + inflight.put(key, starting(to, ranges)); + Ranges ownedRanges = ownedRangesForNode(to); + Invariants.checkArgument(ownedRanges.containsAll(ranges), "Got a reply from %s for ranges %s, but owned ranges %s does not contain all the ranges", to, ranges, ownedRanges); + PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); + node.send(to, new FetchRequest(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, rangeReadTxn(ranges)), new Callback<ReadData.ReadReply>() + { + @Override + public void onSuccess(Node.Id from, ReadData.ReadReply reply) + { + if (!reply.isOk()) + { + fail(to, new RuntimeException(reply.toString())); + inflight.remove(key).cancel(); + switch ((ReadData.ReadNack) reply) + { + default: throw new AssertionError("Unhandled enum: " + reply); + case Invalid: + case Redundant: + case NotCommitted: + throw new AssertionError(String.format("Unexpected reply: %s", reply)); + case Error: + // TODO (required): ensure errors are propagated to coordinators and can be logged + } + return; + } + + FetchResponse ok = (FetchResponse) reply; + Ranges received; + if (ok.unavailable != null) + { + unavailable(to, ok.unavailable); + if (ok.data == null) + { + inflight.remove(key).cancel(); + return; + } + received = ranges.difference(ok.unavailable); + } + else + { + received = ranges; + } + + // TODO (now): make sure it works if invoked in either order + inflight.remove(key).started(ok.maxApplied); + onReadOk(to, commandStore, ok.data, received); + // received must be invoked after submitting the persistence future, as it triggers onDone + // which creates a ReducingFuture over {@code persisting} + } + + @Override + public void onFailure(Node.Id from, Throwable failure) + { + inflight.remove(key).cancel(); + fail(from, failure); + } + + @Override + public void onCallbackFailure(Node.Id from, Throwable failure) + { + // TODO (soon) + logger.error("Fetch coordination failure from " + from, failure); + } + }); + } + + public FetchResult result() + { + return result; + } + + @Override + protected void onDone(Ranges success, Throwable failure) + { + if (failure != null || success.isEmpty()) result.setFailure(failure); + else if (persisting.isEmpty()) result.setSuccess(Ranges.EMPTY); + else AsyncChains.reduce(persisting, (a, b) -> null) + .begin((s, f) -> { + if (f == null) result.setSuccess(ranges); + else result.setFailure(f); + }); + } + + @Override + public void start() + { + super.start(); + } + + void abort(Ranges abort) + { + // TODO (required, later): implement abort + } + + public static class FetchRequest extends WaitAndReadData + { + public final PartialDeps partialDeps; + private transient Timestamp maxApplied; + + public FetchRequest(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) + { + super(ranges, sourceEpoch, Status.Applied, partialDeps, Timestamp.MAX, syncId, partialTxn); + this.partialDeps = partialDeps; + } + + @Override + protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) + { + Ranges slice = commandStore.rangesForEpochHolder().get().allAt(executeReadAt).difference(unavailable); + commandStore.maxAppliedFor(readScope, slice).begin((newMaxApplied, failure) -> { + if (failure != null) + { + commandStore.agent().onUncaughtException(failure); + } + else + { + synchronized (this) + { + if (maxApplied == null) maxApplied = newMaxApplied; + else maxApplied = Timestamp.max(maxApplied, newMaxApplied); + Ranges reportUnavailable = unavailable.slice((Ranges)this.readScope, Minimal); + super.readComplete(commandStore, result, reportUnavailable); + } + } + }); + } + + @Override + protected void reply(@Nullable Ranges unavailable, @Nullable Data data) + { + node.reply(replyTo, replyContext, new FetchResponse(unavailable, data, maxApplied)); + } + + @Override + public MessageType type() + { + return MessageType.FETCH_DATA_REQ; + } + } + + public static class FetchResponse extends ReadData.ReadOk + { + public final Timestamp maxApplied; + public FetchResponse(@Nullable Ranges unavailable, @Nullable Data data, Timestamp maxApplied) + { + super(unavailable, data); + this.maxApplied = maxApplied; + } + + @Override + public MessageType type() + { + return MessageType.FETCH_DATA_RSP; + } + } +} diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index a380bdec..f4b160bd 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -81,8 +81,6 @@ import static accord.local.SafeCommandStore.TestDep.WITH; import static accord.local.Status.Committed; import static accord.local.Status.PreAccepted; import static accord.local.Status.PreCommitted; -import static accord.local.Status.Applied; -import static accord.local.Status.Invalidated; import static accord.primitives.Routables.Slice.Minimal; public abstract class InMemoryCommandStore extends CommandStore @@ -716,19 +714,6 @@ public abstract class InMemoryCommandStore extends CommandStore }); } - public Timestamp maxApplied(Seekables<?, ?> keysOrRanges, Ranges slice) - { - Seekables<?, ?> sliced = keysOrRanges.slice(slice, Minimal); - Timestamp timestamp = Timestamp.NONE; - for (SafeCommand safeCommand : commands.values()) - { - Command command = safeCommand.current(); - if (command.hasBeen(Applied) && !command.hasBeen(Invalidated) && command.partialTxn().keys().intersects(sliced)) - timestamp = Timestamp.max(timestamp, command.executeAt()); - } - return timestamp; - } - @Override public NodeTimeService time() { diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 37046e42..becc7538 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -128,6 +128,21 @@ public abstract class CommandStore implements AgentExecutor public abstract void shutdown(); + private static Timestamp maxApplied(SafeCommandStore safeStore, Seekables<?, ?> keysOrRanges, Ranges slice) + { + return safeStore.mapReduce(keysOrRanges, slice, SafeCommandStore.TestKind.Ws, + SafeCommandStore.TestTimestamp.STARTED_AFTER, Timestamp.NONE, + SafeCommandStore.TestDep.ANY_DEPS, null, + Status.Applied, Status.Applied, + (key, txnId, executeAt, max) -> Timestamp.max(max, executeAt), + Timestamp.NONE, Timestamp.MAX); + } + + public AsyncChain<Timestamp> maxAppliedFor(Seekables<?, ?> keysOrRanges, Ranges slice) + { + return submit(PreLoadContext.contextFor(keysOrRanges), safeStore -> maxApplied(safeStore, keysOrRanges, slice)); + } + // implementations are expected to override this for persistence protected void setRejectBefore(ReducingRangeMap<Timestamp> newRejectBefore) { @@ -154,6 +169,21 @@ public abstract class CommandStore implements AgentExecutor this.safeToRead = newSafeToRead; } + public NavigableMap<TxnId, Ranges> bootstrapBeganAt() + { + return bootstrapBeganAt; + } + + public NavigableMap<Timestamp, Ranges> safeToRead() + { + return safeToRead; + } + + public long maxBootstrapEpoch() + { + return maxBootstrapEpoch; + } + public void markExclusiveSyncPoint(TxnId txnId, Ranges ranges, SafeCommandStore safeStore) { Invariants.checkArgument(txnId.rw() == ExclusiveSyncPoint); @@ -286,7 +316,7 @@ public abstract class CommandStore implements AgentExecutor * So, the outer future's success is sufficient for the topology to be acknowledged, and the inner future for the * bootstrap to be complete. */ - Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch) + protected Supplier<EpochReady> sync(Node node, Ranges ranges, long epoch) { return () -> { AsyncResults.SettableResult<Void> whenDone = new AsyncResults.SettableResult<>(); diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index c5dca548..2cb4eef1 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -331,7 +331,7 @@ public abstract class CommandStores return newLocalTopology.epoch() != 1; } - private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology) + private synchronized TopologyUpdate updateTopology(Node node, Snapshot prev, Topology newTopology, boolean startSync) { checkArgument(!newTopology.isSubset(), "Use full topology for CommandStores.updateTopology"); @@ -364,8 +364,10 @@ public abstract class CommandStores bootstrapUpdates.add(shard.store.interruptBootstraps(epoch, newRanges.currentRanges())); } // TODO (desired): only sync affected shards - if (epoch > 1) - bootstrapUpdates.add(shard.store.sync(node, shard.ranges().currentRanges(), epoch)); + Ranges ranges = shard.ranges().currentRanges(); + // ranges can be empty when ranges are lost or consolidated across epochs. + if (epoch > 1 && startSync && !ranges.isEmpty()) + bootstrapUpdates.add(shard.store.sync(node, ranges, epoch)); result.add(shard); } @@ -378,7 +380,7 @@ public abstract class CommandStores ShardHolder shardHolder = new ShardHolder(supplier.create(nextId++, rangesHolder), rangesHolder); rangesHolder.current = new RangesForEpoch(epoch, add, shardHolder.store); - Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.local, newLocalTopology, range)); + Map<Boolean, Ranges> partitioned = add.partitioningBy(range -> shouldBootstrap(node, prev.global, newLocalTopology, range)); if (partitioned.containsKey(true)) bootstrapUpdates.add(shardHolder.store.bootstrapper(node, partitioned.get(true), newLocalTopology.epoch())); if (partitioned.containsKey(false)) @@ -515,9 +517,9 @@ public abstract class CommandStores return chain; } - public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology) + public synchronized Supplier<EpochReady> updateTopology(Node node, Topology newTopology, boolean startSync) { - TopologyUpdate update = updateTopology(node, current, newTopology); + TopologyUpdate update = updateTopology(node, current, newTopology, startSync); current = update.snapshot; return update.bootstrap; } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 01e83ebb..973885fd 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -149,9 +149,10 @@ public class Node implements ConfigurationService.Listener, NodeTimeService configService.registerListener(this); } + // TODO (cleanup, testing): remove, only used by Maelstrom public AsyncResult<Void> start() { - return onTopologyUpdateInternal(configService.currentTopology()).metadata; + return onTopologyUpdateInternal(configService.currentTopology(), false).metadata; } public CommandStores commandStores() @@ -175,25 +176,25 @@ public class Node implements ConfigurationService.Listener, NodeTimeService return topology().epoch(); } - private synchronized EpochReady onTopologyUpdateInternal(Topology topology) + private synchronized EpochReady onTopologyUpdateInternal(Topology topology, boolean startSync) { - Supplier<EpochReady> bootstrap = commandStores.updateTopology(this, topology); + Supplier<EpochReady> bootstrap = commandStores.updateTopology(this, topology, startSync); this.topology.onTopologyUpdate(topology); return bootstrap.get(); } @Override - public synchronized AsyncResult<Void> onTopologyUpdate(Topology topology) + public synchronized AsyncResult<Void> onTopologyUpdate(Topology topology, boolean startSync) { if (topology.epoch() <= this.topology.epoch()) return AsyncResults.success(null); - EpochReady ready = onTopologyUpdateInternal(topology); + EpochReady ready = onTopologyUpdateInternal(topology, startSync); configService.acknowledgeEpoch(ready); return ready.coordination; } @Override - public void onEpochSyncComplete(Id node, long epoch) + public void onRemoteSyncComplete(Id node, long epoch) { topology.onEpochSyncComplete(node, epoch); } diff --git a/accord-core/src/main/java/accord/messages/MessageType.java b/accord-core/src/main/java/accord/messages/MessageType.java index f5361e6e..f3b44648 100644 --- a/accord-core/src/main/java/accord/messages/MessageType.java +++ b/accord-core/src/main/java/accord/messages/MessageType.java @@ -47,6 +47,8 @@ public enum MessageType INFORM_HOME_DURABLE_REQ (true ), CHECK_STATUS_REQ (false), CHECK_STATUS_RSP (false), + FETCH_DATA_REQ (false), + FETCH_DATA_RSP (false) ; /** @@ -58,4 +60,4 @@ public enum MessageType { this.hasSideEffects = hasSideEffects; } -} \ No newline at end of file +} diff --git a/accord-core/src/main/java/accord/messages/WaitAndReadData.java b/accord-core/src/main/java/accord/messages/WaitAndReadData.java index 33716a7c..c854aa94 100644 --- a/accord-core/src/main/java/accord/messages/WaitAndReadData.java +++ b/accord-core/src/main/java/accord/messages/WaitAndReadData.java @@ -30,12 +30,12 @@ import org.agrona.collections.Int2ObjectHashMap; public abstract class WaitAndReadData extends ReadData { - final Status waitForStatus; - final Deps waitOn; - protected final Timestamp waitUntil; // this may be set to Timestamp.MAX if we want to wait for all deps, regardless of when they execute - protected final Timestamp executeReadAt; - final PartialTxn read; - final Int2ObjectHashMap<LocalBarrier> barriers = new Int2ObjectHashMap<>(); + public final Status waitForStatus; + public final Deps waitOn; + public final Timestamp waitUntil; // this may be set to Timestamp.MAX if we want to wait for all deps, regardless of when they execute + public final Timestamp executeReadAt; + public final PartialTxn read; + transient final Int2ObjectHashMap<LocalBarrier> barriers = new Int2ObjectHashMap<>(); protected WaitAndReadData(Seekables<?, ?> readScope, long waitForEpoch, Status waitForStatus, Deps waitOn, Timestamp waitUntil, Timestamp executeReadAt, PartialTxn read) { diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 56b251e9..30d06788 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -28,6 +28,7 @@ import accord.local.Node.Id; import accord.primitives.*; import accord.utils.*; import accord.utils.ArrayBuffers.IntBuffers; +import com.google.common.annotations.VisibleForTesting; import static accord.utils.SortedArrays.Search.FLOOR; import static accord.utils.SortedArrays.exponentialSearch; @@ -35,6 +36,7 @@ import static accord.utils.SortedArrays.exponentialSearch; public class Topology { public static final Topology EMPTY = new Topology(0, new Shard[0], Ranges.EMPTY, Collections.emptyMap(), Ranges.EMPTY, new int[0]); + private static final int[] EMPTY_SUBSET = new int[0]; final long epoch; final Shard[] shards; final Ranges ranges; @@ -193,6 +195,12 @@ public class Topology return Arrays.binarySearch(supersetIndexes, i); } + @VisibleForTesting + public Topology withEmptySubset() + { + return forSubset(EMPTY_SUBSET); + } + public Topology forSelection(Unseekables<?, ?> select, OnUnknown onUnknown) { return forSelection(select, onUnknown, (ignore, index) -> true, null); diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index 9c083a5c..d1fb8bfd 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -69,9 +69,19 @@ public class TopologyManager this.local = global.forNode(node).trim(); Invariants.checkArgument(!global().isSubset()); // TODO: can we just track sync for local ranges here? - this.syncTracker = new QuorumTracker(new Single(sorter, global())); - this.syncComplete = syncComplete; - this.prevSynced = prevSynced; + if (global().size() > 0) + { + this.syncTracker = new QuorumTracker(new Single(sorter, global())); + this.syncComplete = syncComplete; + this.prevSynced = prevSynced; + } + else + { + // if topology is empty, there is nothing to sync + this.syncTracker = null; + this.syncComplete = true; + this.prevSynced = true; + } } void markPrevSynced() @@ -124,7 +134,7 @@ public class TopologyManager boolean shardIsUnsynced(int idx) { - return !prevSynced || !syncTracker.get(idx).hasReachedQuorum(); + return !prevSynced || (!syncComplete && !syncTracker.get(idx).hasReachedQuorum()); } } @@ -178,6 +188,8 @@ public class TopologyManager public long minEpoch() { + if (currentEpoch == 0) + return 0; return currentEpoch - epochs.length + 1; } @@ -292,17 +304,16 @@ public class TopologyManager public synchronized void truncateTopologyUntil(long epoch) { Epochs current = epochs; - checkArgument(current.epoch() >= epoch); + checkArgument(current.epoch() >= epoch, "Unable to truncate; epoch %d is > current epoch %d", epoch , current.epoch()); if (current.minEpoch() >= epoch) return; int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); - Invariants.checkState(current.epochs[newLen - 1].syncComplete()); + Invariants.checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch()); - EpochState[] nextEpochs = new EpochState[newLen]; - System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); - epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures); + epochs = new Epochs(Arrays.copyOfRange(current.epochs, 0, newLen), + current.pendingSyncComplete, current.futureEpochFutures); } public TopologySorter.Supplier sorter() diff --git a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java index b5962058..e9e4d1b4 100644 --- a/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingIntervalMap.java @@ -62,6 +62,11 @@ public class ReducingIntervalMap<K extends Comparable<? super K>, V> @VisibleForTesting ReducingIntervalMap(boolean inclusiveEnds, K[] ends, V[] values) { + if (ends.length != values.length - 1) + throw new IllegalArgumentException(String.format("Length %d != %d - 1; %s -> %s", + ends.length, values.length, + Arrays.toString(ends), Arrays.toString(values))); + this.inclusiveEnds = inclusiveEnds; this.ends = ends; this.values = values; @@ -101,6 +106,11 @@ public class ReducingIntervalMap<K extends Comparable<? super K>, V> return Arrays.hashCode(values); } + public boolean inclusiveEnds() + { + return inclusiveEnds; + } + public V get(K key) { int idx = Arrays.binarySearch(ends, key); @@ -109,11 +119,22 @@ public class ReducingIntervalMap<K extends Comparable<? super K>, V> return values[idx]; } - public V value(int idx) + private void checkIndex(int idx) { - if (idx < 0 || idx > size()) - throw new IndexOutOfBoundsException(); + if (idx < 0 || idx > size() - 1) + throw new IndexOutOfBoundsException(String.format("%d < 0 or > %d - 1", idx, size())); + } + + public K key(int idx) + { + checkIndex(idx); + return ends[idx]; + } + public V value(int idx) + { + if (idx < 0 || idx >= values.length) + throw new IndexOutOfBoundsException(String.format("%d < 0 or > %d - 1", idx, size())); return values[idx]; } diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java index 7555a059..4788c176 100644 --- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java @@ -28,6 +28,14 @@ import static accord.utils.SortedArrays.Search.FAST; public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> { + public static class SerializerSupport + { + public static <V> ReducingRangeMap<V> create(boolean inclusiveEnds, RoutingKey[] ends, V[] values) + { + return new ReducingRangeMap<>(inclusiveEnds, ends, values); + } + } + final RoutingKeys endKeys; public ReducingRangeMap(V value) diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java index 4c3e7eb4..5fb6ede3 100644 --- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java +++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java @@ -587,4 +587,16 @@ public abstract class AsyncChains<V> implements AsyncChain<V> // ignore } } + + public static void awaitUninterruptiblyAndRethrow(AsyncChain<?> chain) + { + try + { + getUninterruptibly(chain); + } + catch (ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } + } } \ No newline at end of file diff --git a/accord-core/src/test/java/accord/api/TestableConfigurationService.java b/accord-core/src/test/java/accord/api/TestableConfigurationService.java index 264f30bc..0d1a7490 100644 --- a/accord-core/src/test/java/accord/api/TestableConfigurationService.java +++ b/accord-core/src/test/java/accord/api/TestableConfigurationService.java @@ -19,9 +19,8 @@ package accord.api; import accord.topology.Topology; -import accord.utils.async.AsyncResult; public interface TestableConfigurationService extends ConfigurationService { - AsyncResult<?> reportTopology(Topology topology); + void reportTopology(Topology topology); } diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index 855366f6..f893e38f 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -34,7 +34,7 @@ import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; -public class BurnTestConfigurationService extends AbstractConfigurationService implements TestableConfigurationService +public class BurnTestConfigurationService extends AbstractConfigurationService.Minimal implements TestableConfigurationService { private final AgentExecutor executor; private final Function<Node.Id, Node> lookup; @@ -121,7 +121,7 @@ public class BurnTestConfigurationService extends AbstractConfigurationService i if (candidates.isEmpty()) { candidates.addAll(currentTopology().nodes()); - candidates.remove(node); + candidates.remove(localId); } int idx = randomSupplier.get().nextInt(candidates.size()); Node.Id node = candidates.remove(idx); @@ -157,9 +157,9 @@ public class BurnTestConfigurationService extends AbstractConfigurationService i } @Override - protected void epochSyncComplete(Topology topology) + protected void localSyncComplete(Topology topology) { - topologyUpdates.syncComplete(lookup.apply(node), topology.nodes(), topology.epoch()); + topologyUpdates.syncComplete(lookup.apply(localId), topology.nodes(), topology.epoch()); } @Override @@ -174,6 +174,6 @@ public class BurnTestConfigurationService extends AbstractConfigurationService i private Node originator() { - return lookup.apply(node); + return lookup.apply(localId); } } diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdates.java b/accord-core/src/test/java/accord/burn/TopologyUpdates.java index 4d99b393..7092dd31 100644 --- a/accord-core/src/test/java/accord/burn/TopologyUpdates.java +++ b/accord-core/src/test/java/accord/burn/TopologyUpdates.java @@ -69,7 +69,7 @@ public class TopologyUpdates pendingTopologies.remove(epoch); MessageTask.begin(originator, cluster, executor, "SyncComplete:" + epoch, (node, from, onDone) -> { - node.onEpochSyncComplete(originator.id(), epoch); + node.onRemoteSyncComplete(originator.id(), epoch); onDone.accept(true); }); } diff --git a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java index 658ffd28..dcdafcc6 100644 --- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java +++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java @@ -73,7 +73,8 @@ public class TopologyChangeTest cluster.configServices(4).forEach(config -> { try { - getUninterruptibly(config.reportTopology(topology2)); + config.reportTopology(topology2); + getUninterruptibly(config.ackFor(topology2.epoch()).coordination); } catch (ExecutionException e) { diff --git a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java index d0e6ee03..567736d0 100644 --- a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java +++ b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java @@ -33,7 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import accord.api.ConfigurationService; -import accord.impl.AbstractConfigurationService.EpochHistory; +import accord.impl.AbstractConfigurationService.Minimal.EpochHistory; import accord.local.Node.Id; import accord.primitives.Range; import accord.topology.Shard; @@ -58,7 +58,7 @@ public class AbstractConfigurationServiceTest } @Override - public AsyncResult<Void> onTopologyUpdate(Topology topology) + public AsyncResult<Void> onTopologyUpdate(Topology topology, boolean startSync) { if (topologies.put(topology.epoch(), topology) != null) Assertions.fail("Received topology twice for epoch " + topology.epoch()); @@ -68,7 +68,7 @@ public class AbstractConfigurationServiceTest } @Override - public void onEpochSyncComplete(Id node, long epoch) + public void onRemoteSyncComplete(Id node, long epoch) { Set<Id> synced = syncCompletes.computeIfAbsent(epoch, e -> new HashSet<>()); if (!synced.add(node)) @@ -113,7 +113,7 @@ public class AbstractConfigurationServiceTest } } - private static class TestableConfigurationService extends AbstractConfigurationService + private static class TestableConfigurationService extends AbstractConfigurationService.Minimal { final Set<Long> syncStarted = new HashSet<>(); final Set<Long> epochsFetched = new HashSet<>(); @@ -130,7 +130,7 @@ public class AbstractConfigurationServiceTest } @Override - protected void epochSyncComplete(Topology topology) + protected void localSyncComplete(Topology topology) { if (!syncStarted.add(topology.epoch())) Assertions.fail("Sync started multiple times for " + topology.epoch()); diff --git a/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java new file mode 100644 index 00000000..a6684bd9 --- /dev/null +++ b/accord-core/src/test/java/accord/impl/list/ListFetchCoordinator.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl.list; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import accord.api.Data; +import accord.api.DataStore; +import accord.impl.AbstractFetchCoordinator; +import accord.local.CommandStore; +import accord.local.Node; +import accord.local.PreLoadContext; +import accord.primitives.PartialTxn; +import accord.primitives.Ranges; +import accord.primitives.SyncPoint; +import accord.primitives.Txn; +import accord.utils.Timestamped; +import accord.utils.async.AsyncResult; + +public class ListFetchCoordinator extends AbstractFetchCoordinator +{ + private final ListStore listStore; + final List<AsyncResult<Void>> persisting = new ArrayList<>(); + + public ListFetchCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, DataStore.FetchRanges fetchRanges, CommandStore commandStore, ListStore listStore) + { + super(node, ranges, syncPoint, fetchRanges, commandStore); + this.listStore = listStore; + } + + @Override + protected PartialTxn rangeReadTxn(Ranges ranges) + { + return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(Function.identity(), ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE), null); + } + + @Override + protected void onReadOk(Node.Id from, CommandStore commandStore, Data data, Ranges received) + { + if (data == null) + return; + + ListData listData = (ListData) data; + persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore -> { + listData.forEach((key, value) -> listStore.data.merge(key, value, Timestamped::merge)); + }).addCallback((ignore, fail) -> { + if (fail == null) success(from, received); + else fail(from, received, fail); + }).beginAsResult()); + } +} diff --git a/accord-core/src/test/java/accord/impl/list/ListStore.java b/accord-core/src/test/java/accord/impl/list/ListStore.java index 1385d9f5..770cc038 100644 --- a/accord-core/src/test/java/accord/impl/list/ListStore.java +++ b/accord-core/src/test/java/accord/impl/list/ListStore.java @@ -18,256 +18,25 @@ package accord.impl.list; -import java.util.*; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; import java.util.stream.Collectors; -import javax.annotation.Nullable; - -import accord.api.Data; +import accord.api.DataStore; import accord.api.Key; -import accord.coordinate.FetchCoordinator; -import accord.impl.InMemoryCommandStore; -import accord.local.CommandStore; import accord.local.Node; -import accord.api.DataStore; -import accord.local.PreLoadContext; import accord.local.SafeCommandStore; -import accord.local.Status; -import accord.messages.ReadData.ReadNack; -import accord.messages.Callback; -import accord.messages.ReadData.ReadOk; -import accord.messages.ReadData.ReadReply; -import accord.messages.WaitAndReadData; -import accord.primitives.PartialDeps; -import accord.primitives.PartialTxn; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.RoutableKey; import accord.primitives.SyncPoint; import accord.primitives.Timestamp; -import accord.primitives.Txn; -import accord.primitives.TxnId; -import accord.utils.Invariants; import accord.utils.Timestamped; -import accord.utils.async.AsyncChains; -import accord.utils.async.AsyncResult; -import accord.utils.async.AsyncResults.SettableResult; - -import static accord.primitives.Routables.Slice.Minimal; public class ListStore implements DataStore { - static class SyncResult extends SettableResult<Ranges> implements FetchResult - { - final SyncCoordinator coordinator; - - SyncResult(SyncCoordinator coordinator) - { - this.coordinator = coordinator; - } - - @Override - public void abort(Ranges abort) - { - coordinator.abort(abort); - } - } - static class SyncCoordinator extends FetchCoordinator - { - static class Key - { - final Node.Id id; - final Ranges ranges; - - Key(Node.Id id, Ranges ranges) - { - this.id = id; - this.ranges = ranges; - } - - @Override - public int hashCode() - { - return id.hashCode() + ranges.hashCode(); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) return true; - if (!(obj instanceof Key)) return false; - Key that = (Key) obj; - return id.equals(that.id) && ranges.equals(that.ranges); - } - } - - final FetchRanges fetchRanges; - final CommandStore commandStore; - final ListStore dataStore; - final Map<Key, StartingRangeFetch> inflight = new HashMap<>(); - final SyncResult done = new SyncResult(this); - final List<AsyncResult<Void>> persisting = new ArrayList<>(); - - private SyncCoordinator(Node node, Ranges ranges, SyncPoint syncPoint, FetchRanges fetchRanges, CommandStore commandStore, ListStore dataStore) - { - super(node, ranges, syncPoint, fetchRanges); - this.fetchRanges = fetchRanges; - this.commandStore = commandStore; - this.dataStore = dataStore; - } - - @Override - public void contact(Node.Id to, Ranges ranges) - { - Key key = new Key(to, ranges); - inflight.put(key, starting(to, ranges)); - Ranges ownedRanges = ownedRangesForNode(to); - Invariants.checkArgument(ownedRanges.containsAll(ranges)); - PartialDeps partialDeps = syncPoint.waitFor.slice(ownedRanges, ranges); - PartialTxn partialTxn = new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, new ListRead(unsafeStore -> unsafeStore, ranges, ranges), new ListQuery(Node.Id.NONE, Long.MIN_VALUE), null); - node.send(to, new StoreSync(syncPoint.sourceEpoch(), syncPoint.syncId, ranges, partialDeps, partialTxn), new Callback<ReadReply>() - { - @Override - public void onSuccess(Node.Id from, ReadReply reply) - { - if (!reply.isOk()) - { - fail(to, new RuntimeException(reply.toString())); - inflight.remove(key).cancel(); - switch ((ReadNack) reply) - { - default: throw new AssertionError("Unhandled enum"); - case Invalid: - case Redundant: - case NotCommitted: - throw new AssertionError(); - case Error: - // TODO (required): ensure errors are propagated to coordinators and can be logged - } - return; - } - - SyncReply ok = (SyncReply) reply; - Ranges received; - if (ok.unavailable != null) - { - unavailable(to, ok.unavailable); - if (ok.data == null) - { - inflight.remove(key).cancel(); - return; - } - received = ranges.difference(ok.unavailable); - } - else - { - received = ranges; - } - - // TODO (now): make sure it works if invoked in either order - inflight.remove(key).started(ok.maxApplied); - ListData data = (ListData) ok.data; - if (data != null) - { - persisting.add(commandStore.execute(PreLoadContext.empty(), safeStore -> { - data.forEach((key, value) -> dataStore.data.merge(key, value, Timestamped::merge)); - }).addCallback((ignore, fail) -> { - synchronized (this) - { - if (fail == null) success(to, received); - else fail(to, received, fail); - } - }).beginAsResult()); - } - // received must be invoked after submitting the persistence future, as it triggers onDone - // which creates a ReducingFuture over {@code persisting} - } - - @Override - public void onFailure(Node.Id from, Throwable failure) - { - inflight.remove(key).cancel(); - fail(from, failure); - } - - @Override - public void onCallbackFailure(Node.Id from, Throwable failure) - { - // TODO (soon) - failure.printStackTrace(); - } - }); - } - - @Override - protected void onDone(Ranges success, Throwable failure) - { - if (success.isEmpty()) done.setFailure(failure); - else if (persisting.isEmpty()) done.setSuccess(null); - else AsyncChains.reduce(persisting, (a, b)-> null) - .begin((s, f) -> { - if (f == null) done.setSuccess(ranges); - else done.setFailure(f); - }); - } - - @Override - protected void start() - { - super.start(); - } - - void abort(Ranges abort) - { - // TODO (required, later): implement abort - } - } - - static class StoreSync extends WaitAndReadData - { - final PartialDeps partialDeps; - Timestamp maxApplied; - - StoreSync(long sourceEpoch, TxnId syncId, Ranges ranges, PartialDeps partialDeps, PartialTxn partialTxn) - { - - super(ranges, sourceEpoch, Status.Applied, partialDeps, Timestamp.MAX, syncId, partialTxn); - this.partialDeps = partialDeps; - } - - @Override - protected void readComplete(CommandStore commandStore, Data result, Ranges unavailable) - { - commandStore.execute(PreLoadContext.empty(), safeStore -> { - Ranges slice = safeStore.ranges().allAt(executeReadAt).difference(unavailable); - Timestamp newMaxApplied = ((InMemoryCommandStore.InMemorySafeStore)safeStore).maxApplied(readScope, slice); - synchronized (this) - { - if (maxApplied == null) maxApplied = newMaxApplied; - else maxApplied = Timestamp.max(maxApplied, newMaxApplied); - Ranges reportUnavailable = unavailable.slice((Ranges)this.readScope, Minimal); - super.readComplete(commandStore, result, reportUnavailable); - } - }).begin(node.agent()); - } - - @Override - protected void reply(@Nullable Ranges unavailable, @Nullable Data data) - { - node.reply(replyTo, replyContext, new SyncReply(unavailable, data, maxApplied)); - } - } - - static class SyncReply extends ReadOk - { - final Timestamp maxApplied; - public SyncReply(@Nullable Ranges unavailable, @Nullable Data data, Timestamp maxApplied) - { - super(unavailable, data); - this.maxApplied = maxApplied; - } - } - static final Timestamped<int[]> EMPTY = new Timestamped<>(Timestamp.NONE, new int[0]); final NavigableMap<RoutableKey, Timestamped<int[]>> data = new TreeMap<>(); @@ -300,8 +69,8 @@ public class ListStore implements DataStore @Override public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges ranges, SyncPoint syncPoint, FetchRanges callback) { - SyncCoordinator coordinator = new SyncCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore(), this); + ListFetchCoordinator coordinator = new ListFetchCoordinator(node, ranges, syncPoint, callback, safeStore.commandStore(), this); coordinator.start(); - return coordinator.done; + return coordinator.result(); } } diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index d2bc2337..c3516377 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -119,7 +119,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new); awaitUninterruptibly(node.start()); - node.onTopologyUpdate(topology); + node.onTopologyUpdate(topology, true); return node; } diff --git a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java index 45e6a9b7..bae0cec5 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java +++ b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java @@ -35,6 +35,7 @@ public class MockConfigurationService implements TestableConfigurationService { private final MessageSink messageSink; private final List<Topology> epochs = new ArrayList<>(); + private final Map<Long, EpochReady> acks = new HashMap<>(); private final List<AsyncResult<Void>> syncs = new ArrayList<>(); private final List<Listener> listeners = new ArrayList<>(); private final EpochFunction<MockConfigurationService> fetchTopologyHandler; @@ -81,34 +82,40 @@ public class MockConfigurationService implements TestableConfigurationService } @Override - public void acknowledgeEpoch(EpochReady epoch) + public synchronized void acknowledgeEpoch(EpochReady epoch) { + Assertions.assertFalse(acks.containsKey(epoch.epoch)); + acks.put(epoch.epoch, epoch); + } + + public synchronized EpochReady ackFor(long epoch) + { + return acks.get(epoch); } @Override - public synchronized AsyncResult<Void> reportTopology(Topology topology) + public synchronized void reportTopology(Topology topology) { if (topology.epoch() > epochs.size()) - return syncs.get((int)topology.epoch() - 1); + return; Assertions.assertEquals(topology.epoch(), epochs.size()); epochs.add(topology); List<AsyncResult<Void>> futures = new ArrayList<>(); for (Listener listener : listeners) - futures.add(listener.onTopologyUpdate(topology)); + futures.add(listener.onTopologyUpdate(topology, true)); AsyncResult<Void> result = futures.isEmpty() ? AsyncResults.success(null) : AsyncChains.reduce(futures, (a, b) -> null).beginAsResult(); syncs.add(result); - return result; } public synchronized void reportSyncComplete(Node.Id node, long epoch) { for (Listener listener : listeners) - listener.onEpochSyncComplete(node, epoch); + listener.onRemoteSyncComplete(node, epoch); } } diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index c39f46a1..a1cfa215 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -100,7 +100,7 @@ public class ImmutableCommandTest new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null, SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new); awaitUninterruptibly(node.start()); - node.onTopologyUpdate(storeSupport.local.get()); + node.onTopologyUpdate(storeSupport.local.get(), true); return node; } diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index bec84683..36825cb6 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -152,23 +152,27 @@ public class TopologyManagerTest { Range range = range(100, 200); Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))); - Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(2, 3))); + Topology topology2 = topology(2, shard(range, idList(1, 2, 3), idSet(1, 2))); + Topology topology3 = topology(3, shard(range, idList(1, 2, 3), idSet(2, 3))); TopologyManager service = new TopologyManager(SUPPLIER, ID); Assertions.assertSame(Topology.EMPTY, service.current()); service.onTopologyUpdate(topology1); service.onTopologyUpdate(topology2); + service.onTopologyUpdate(topology3); Assertions.assertFalse(service.getEpochStateUnsafe(2).syncComplete()); RoutingKeys keys = keys(150).toUnseekables(); - Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT), topology1.forSelection(keys, Topology.OnUnknown.REJECT)), - service.withUnsyncedEpochs(keys, 2, 2)); + Assertions.assertEquals(topologies(topology3.forSelection(keys, Topology.OnUnknown.REJECT), topology2.forSelection(keys, Topology.OnUnknown.REJECT), topology1.withEmptySubset()), + service.withUnsyncedEpochs(keys, 3, 3)); service.onEpochSyncComplete(id(2), 2); service.onEpochSyncComplete(id(3), 2); - Assertions.assertEquals(topologies(topology2.forSelection(keys, Topology.OnUnknown.REJECT)), - service.withUnsyncedEpochs(keys, 2, 2)); + service.onEpochSyncComplete(id(2), 3); + service.onEpochSyncComplete(id(3), 3); + Assertions.assertEquals(topologies(topology3.forSelection(keys, Topology.OnUnknown.REJECT)), + service.withUnsyncedEpochs(keys, 3, 3)); } /** @@ -182,22 +186,26 @@ public class TopologyManagerTest shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))); Topology topology2 = topology(2, + shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), + shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))); + Topology topology3 = topology(3, shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); TopologyManager service = new TopologyManager(SUPPLIER, ID); service.onTopologyUpdate(topology1); service.onTopologyUpdate(topology2); + service.onTopologyUpdate(topology3); - // no acks, so all epoch 1 shards should be included - Assertions.assertEquals(topologies(topology2, topology1), - service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2)); + // no acks, so all epoch 2 shards should be included + Assertions.assertEquals(topologies(topology3, topology2, topology1.withEmptySubset()), + service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 3, 3)); // first topology acked, so only the second shard should be included - service.onEpochSyncComplete(id(1), 1); - service.onEpochSyncComplete(id(2), 1); - Topologies actual = service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 2, 2); - Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))), + service.onEpochSyncComplete(id(1), 2); + service.onEpochSyncComplete(id(2), 2); + Topologies actual = service.withUnsyncedEpochs(keys(150, 250).toUnseekables(), 3, 3); + Assertions.assertEquals(topologies(topology3, topology(2, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))), topology1.withEmptySubset()), actual); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org