This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit fe8f473e4257987ea4dd10e5e96e7801df85b6d6 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Thu May 25 15:56:24 2023 -0700 Accord TCM integration Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-18444 --- .../main/java/accord/api/ConfigurationService.java | 6 + .../accord/impl/AbstractConfigurationService.java | 279 +++++++++++++++++++ accord-core/src/main/java/accord/local/Node.java | 7 +- .../src/main/java/accord/topology/Topology.java | 2 +- .../main/java/accord/topology/TopologyManager.java | 35 ++- accord-core/src/test/java/accord/Utils.java | 22 +- .../accord/burn/BurnTestConfigurationService.java | 167 ++---------- .../impl/AbstractConfigurationServiceTest.java | 302 +++++++++++++++++++++ .../test/java/accord/impl/mock/MockCluster.java | 1 + .../java/accord/local/ImmutableCommandTest.java | 1 + .../test/java/accord/messages/PreAcceptTest.java | 3 +- .../java/accord/topology/TopologyManagerTest.java | 63 +++++ 12 files changed, 721 insertions(+), 167 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index 7b868acd..f7ecfece 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -118,6 +118,12 @@ public interface ConfigurationService * This should be invoked on each replica once EpochReady.coordination has returned on a replica. */ void onEpochSyncComplete(Node.Id node, long epoch); + + /** + * Called when the configuration service is meant to truncate it's topology data up to (but not including) + * the given epoch + */ + void truncateTopologyUntil(long epoch); } void registerListener(Listener listener); diff --git a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java new file mode 100644 index 00000000..6d2aaf39 --- /dev/null +++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.primitives.Ints; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.ConfigurationService; +import accord.local.Node; +import accord.topology.Topology; +import accord.utils.Invariants; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; + +public abstract class AbstractConfigurationService implements ConfigurationService +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractConfigurationService.class); + + protected final Node.Id node; + + protected final EpochHistory epochs = new EpochHistory(); + + protected final List<Listener> listeners = new ArrayList<>(); + + static class EpochState + { + private final long epoch; + private final AsyncResult.Settable<Topology> received = AsyncResults.settable(); + private final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); + + private Topology topology = null; + + public EpochState(long epoch) + { + this.epoch = epoch; + } + + public long epoch() + { + return epoch; + } + + @Override + public String toString() + { + return "EpochState{" + epoch + '}'; + } + } + + @VisibleForTesting + protected static class EpochHistory + { + // TODO (low priority): move pendingEpochs / FetchTopology into here? + private List<EpochState> epochs = new ArrayList<>(); + + protected long lastReceived = 0; + private long lastAcknowledged = 0; + + long minEpoch() + { + return epochs.isEmpty() ? 0L : epochs.get(0).epoch; + } + + long maxEpoch() + { + int size = epochs.size(); + return size == 0 ? 0L : epochs.get(size - 1).epoch; + } + + @VisibleForTesting + EpochState atIndex(int idx) + { + return epochs.get(idx); + } + + @VisibleForTesting + int size() + { + return epochs.size(); + } + + EpochState getOrCreate(long epoch) + { + Invariants.checkArgument(epoch > 0); + if (epochs.isEmpty()) + { + EpochState state = new EpochState(epoch); + epochs.add(state); + return state; + } + + long minEpoch = minEpoch(); + if (epoch < minEpoch) + { + int prepend = Ints.checkedCast(minEpoch - epoch); + List<EpochState> next = new ArrayList<>(epochs.size() + prepend); + for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++) + next.add(new EpochState(addEpoch)); + next.addAll(epochs); + epochs = next; + minEpoch = minEpoch(); + Invariants.checkState(minEpoch == epoch); + } + long maxEpoch = maxEpoch(); + int idx = Ints.checkedCast(epoch - minEpoch); + + // add any missing epochs + for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++) + epochs.add(new EpochState(addEpoch)); + + return epochs.get(idx); + } + + public EpochHistory receive(Topology topology) + { + long epoch = topology.epoch(); + Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || lastReceived == 0); + lastReceived = epoch; + EpochState state = getOrCreate(epoch); + if (state != null) + { + state.topology = topology; + state.received.setSuccess(topology); + } + return this; + } + + AsyncResult<Topology> receiveFuture(long epoch) + { + return getOrCreate(epoch).received; + } + + Topology topologyFor(long epoch) + { + return getOrCreate(epoch).topology; + } + + public EpochHistory acknowledge(long epoch) + { + Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 || lastAcknowledged == 0); + lastAcknowledged = epoch; + getOrCreate(epoch).acknowledged.setSuccess(null); + return this; + } + + AsyncResult<Void> acknowledgeFuture(long epoch) + { + return getOrCreate(epoch).acknowledged; + } + + void truncateUntil(long epoch) + { + Invariants.checkArgument(epoch <= maxEpoch()); + long minEpoch = minEpoch(); + int toTrim = Ints.checkedCast(epoch - minEpoch); + if (toTrim <=0) + return; + + epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size())); + } + } + + public AbstractConfigurationService(Node.Id node) + { + this.node = node; + } + + @Override + public synchronized void registerListener(Listener listener) + { + listeners.add(listener); + } + + @Override + public synchronized Topology currentTopology() + { + return epochs.topologyFor(epochs.lastReceived); + } + + @Override + public synchronized Topology getTopologyForEpoch(long epoch) + { + return epochs.topologyFor(epoch); + } + + protected abstract void fetchTopologyInternal(long epoch); + + @Override + public synchronized void fetchTopologyForEpoch(long epoch) + { + if (epoch <= epochs.lastReceived) + return; + + fetchTopologyInternal(epoch); + } + + protected abstract void epochSyncComplete(Topology topology ); + + @Override + public synchronized void acknowledgeEpoch(EpochReady ready) + { + ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch)); + ready.coordination.addCallback(() -> epochSyncComplete(epochs.getOrCreate(ready.epoch).topology)); + } + + protected void topologyUpdatePreListenerNotify(Topology topology) {} + protected void topologyUpdatePostListenerNotify(Topology topology) {} + + public synchronized AsyncResult<Void> reportTopology(Topology topology) + { + long lastReceived = epochs.lastReceived; + if (topology.epoch() <= lastReceived) + return AsyncResults.success(null); + + if (lastReceived > 0 && topology.epoch() > lastReceived + 1) + { + fetchTopologyForEpoch(lastReceived + 1); + epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology)); + return AsyncResults.success(null); + } + + long lastAcked = epochs.lastAcknowledged; + if (lastAcked > 0 && topology.epoch() > lastAcked + 1) + { + epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology)); + return AsyncResults.success(null); + } + logger.trace("Epoch {} received by {}", topology.epoch(), node); + + epochs.receive(topology); + topologyUpdatePreListenerNotify(topology); + for (Listener listener : listeners) + listener.onTopologyUpdate(topology); + topologyUpdatePostListenerNotify(topology); + return AsyncResults.success(null); + } + + protected void epochSyncCompletePreListenerNotify(Node.Id node, long epoch) {} + + public synchronized void epochSyncComplete(Node.Id node, long epoch) + { + epochSyncCompletePreListenerNotify(node, epoch); + for (Listener listener : listeners) + listener.onEpochSyncComplete(node, epoch); + } + + protected void truncateTopologiesPreListenerNotify(long epoch) {} + protected void truncateTopologiesPostListenerNotify(long epoch) {} + + public synchronized void truncateTopologiesUntil(long epoch) + { + truncateTopologiesPreListenerNotify(epoch); + for (Listener listener : listeners) + listener.truncateTopologyUntil(epoch); + truncateTopologiesPostListenerNotify(epoch); + epochs.truncateUntil(epoch); + } +} diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 6e97c042..01e83ebb 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -141,7 +141,6 @@ public class Node implements ConfigurationService.Listener, NodeTimeService this.configService = configService; this.topology = new TopologyManager(topologySorter, id); this.nowSupplier = nowSupplier; - Topology topology = configService.currentTopology(); this.now = new AtomicReference<>(Timestamp.fromValues(topology.epoch(), nowSupplier.getAsLong(), id)); this.agent = agent; this.random = random; @@ -199,6 +198,12 @@ public class Node implements ConfigurationService.Listener, NodeTimeService topology.onEpochSyncComplete(node, epoch); } + @Override + public void truncateTopologyUntil(long epoch) + { + topology.truncateTopologyUntil(epoch); + } + public void withEpoch(long epoch, Runnable runnable) { if (topology.hasEpoch(epoch)) diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 728553c1..56b251e9 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -103,7 +103,7 @@ public class Topology @Override public String toString() { - return "Topology{" + "epoch=" + epoch + ", " + super.toString() + '}'; + return "Topology{" + "epoch=" + epoch + ", " + Arrays.toString(shards) + '}'; } @Override diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index aea8c535..9c083a5c 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -68,6 +68,7 @@ public class TopologyManager this.global = checkArgument(global, !global.isSubset()); this.local = global.forNode(node).trim(); Invariants.checkArgument(!global().isSubset()); + // TODO: can we just track sync for local ranges here? this.syncTracker = new QuorumTracker(new Single(sorter, global())); this.syncComplete = syncComplete; this.prevSynced = prevSynced; @@ -129,6 +130,7 @@ public class TopologyManager private static class Epochs { + private static final Epochs EMPTY = new Epochs(new EpochState[0]); private final long currentEpoch; private final EpochState[] epochs; // nodes we've received sync complete notifications from, for epochs we do not yet have topologies for. @@ -174,6 +176,16 @@ public class TopologyManager return current().epoch + 1; } + public long minEpoch() + { + return currentEpoch - epochs.length + 1; + } + + public long epoch() + { + return currentEpoch; + } + public Topology current() { return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY; @@ -197,6 +209,8 @@ public class TopologyManager else { EpochState state = get(epoch); + if (state == null) + return; state.recordSyncComplete(node); for (epoch++ ; state.syncComplete() && epoch <= currentEpoch; epoch++) { @@ -223,14 +237,15 @@ public class TopologyManager { this.sorter = sorter; this.node = node; - this.epochs = new Epochs(new EpochState[0]); + this.epochs = Epochs.EMPTY; } public synchronized void onTopologyUpdate(Topology topology) { Epochs current = epochs; - checkArgument(topology.epoch == current.nextEpoch(), "Expected topology update %d to be %d", topology.epoch, current.nextEpoch()); + checkArgument(topology.epoch == current.nextEpoch() || epochs == Epochs.EMPTY, + "Expected topology update %d to be %d", topology.epoch, current.nextEpoch()); EpochState[] nextEpochs = new EpochState[current.epochs.length + 1]; List<Set<Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete); Set<Id> alreadySyncd = Collections.emptySet(); @@ -274,6 +289,22 @@ public class TopologyManager epochs.syncComplete(node, epoch); } + public synchronized void truncateTopologyUntil(long epoch) + { + Epochs current = epochs; + checkArgument(current.epoch() >= epoch); + + if (current.minEpoch() >= epoch) + return; + + int newLen = current.epochs.length - (int) (epoch - current.minEpoch()); + Invariants.checkState(current.epochs[newLen - 1].syncComplete()); + + EpochState[] nextEpochs = new EpochState[newLen]; + System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen); + epochs = new Epochs(nextEpochs, current.pendingSyncComplete, current.futureEpochFutures); + } + public TopologySorter.Supplier sorter() { return sorter; diff --git a/accord-core/src/test/java/accord/Utils.java b/accord-core/src/test/java/accord/Utils.java index b5f38637..3b554161 100644 --- a/accord-core/src/test/java/accord/Utils.java +++ b/accord-core/src/test/java/accord/Utils.java @@ -138,17 +138,17 @@ public class Utils MockStore store = new MockStore(); Scheduler scheduler = new ThreadPoolScheduler(); Node node = new Node(nodeId, - messageSink, - new MockConfigurationService(messageSink, EpochFunction.noop(), topology), - clock, - () -> store, - new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), - new TestAgent(), - new DefaultRandom(), - scheduler, - SizeOfIntersectionSorter.SUPPLIER, - SimpleProgressLog::new, - InMemoryCommandStores.Synchronized::new); + messageSink, + new MockConfigurationService(messageSink, EpochFunction.noop(), topology), + clock, + () -> store, + new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), + new TestAgent(), + new DefaultRandom(), + scheduler, + SizeOfIntersectionSorter.SUPPLIER, + SimpleProgressLog::new, + InMemoryCommandStores.Synchronized::new); awaitUninterruptibly(node.start()); return node; } diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index 0c5aa2a6..855366f6 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -20,17 +20,13 @@ package accord.burn; import accord.api.TestableConfigurationService; import accord.local.AgentExecutor; +import accord.impl.AbstractConfigurationService; import accord.utils.RandomSource; import accord.local.Node; import accord.messages.*; import accord.topology.Topology; -import accord.utils.Invariants; -import accord.utils.async.AsyncResult; import accord.utils.async.AsyncResults; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -38,124 +34,22 @@ import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; -public class BurnTestConfigurationService implements TestableConfigurationService +public class BurnTestConfigurationService extends AbstractConfigurationService implements TestableConfigurationService { - private static final Logger logger = LoggerFactory.getLogger(BurnTestConfigurationService.class); - - private final Node.Id node; private final AgentExecutor executor; private final Function<Node.Id, Node> lookup; private final Supplier<RandomSource> randomSupplier; - private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>(); - - private final EpochHistory epochs = new EpochHistory(); - private final List<Listener> listeners = new ArrayList<>(); private final TopologyUpdates topologyUpdates; - - private static class EpochState - { - private final long epoch; - private final AsyncResult.Settable<Topology> received = AsyncResults.settable(); - private final AsyncResult.Settable<Void> acknowledged = AsyncResults.settable(); - private final AsyncResult.Settable<Void> synced = AsyncResults.settable(); - - private Topology topology = null; - - public EpochState(long epoch) - { - this.epoch = epoch; - } - } - - private static class EpochHistory - { - // TODO (low priority): move pendingEpochs / FetchTopology into here? - private final List<EpochState> epochs = new ArrayList<>(); - - private long lastReceived = 0; - private long lastAcknowledged = 0; - private long lastSyncd = 0; - - private EpochState get(long epoch) - { - for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; addEpoch++) - epochs.add(new EpochState(addEpoch)); - return epochs.get((int) epoch); - } - - EpochHistory receive(Topology topology) - { - long epoch = topology.epoch(); - Invariants.checkState(epoch == 0 || lastReceived == epoch - 1); - lastReceived = epoch; - EpochState state = get(epoch); - state.topology = topology; - state.received.setSuccess(topology); - return this; - } - - AsyncResult<Topology> receiveFuture(long epoch) - { - return get(epoch).received; - } - - Topology topologyFor(long epoch) - { - return get(epoch).topology; - } - - EpochHistory acknowledge(long epoch) - { - Invariants.checkState(epoch == 0 || lastAcknowledged == epoch - 1); - lastAcknowledged = epoch; - get(epoch).acknowledged.setSuccess(null); - return this; - } - - AsyncResult<Void> acknowledgeFuture(long epoch) - { - return get(epoch).acknowledged; - } - - EpochHistory syncComplete(long epoch) - { - Invariants.checkState(epoch == 0 || lastSyncd == epoch - 1); - EpochState state = get(epoch); - Invariants.checkState(state.received.isDone()); - Invariants.checkState(state.acknowledged.isDone()); - lastSyncd = epoch; - get(epoch).synced.setSuccess(null); - return this; - } - } + private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>(); public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, Node> lookup, TopologyUpdates topologyUpdates) { - this.node = node; + super(node); this.executor = executor; this.randomSupplier = randomSupplier; this.lookup = lookup; this.topologyUpdates = topologyUpdates; - epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0); - epochs.receive(topology).acknowledge(1).syncComplete(1); - } - - @Override - public synchronized void registerListener(Listener listener) - { - listeners.add(listener); - } - - @Override - public synchronized Topology currentTopology() - { - return epochs.topologyFor(epochs.lastReceived); - } - - @Override - public synchronized Topology getTopologyForEpoch(long epoch) - { - return epochs.topologyFor(epoch); + reportTopology(topology); } private static class FetchTopologyRequest implements Request @@ -257,58 +151,29 @@ public class BurnTestConfigurationService implements TestableConfigurationServic } @Override - public synchronized void fetchTopologyForEpoch(long epoch) + protected void fetchTopologyInternal(long epoch) { - if (epoch <= epochs.lastReceived) - return; - - for (long e = epochs.lastReceived + 1; e < epoch ; ++e) - pendingEpochs.computeIfAbsent(epoch, FetchTopology::new); + pendingEpochs.computeIfAbsent(epoch, FetchTopology::new); } @Override - public synchronized void acknowledgeEpoch(EpochReady ready) - { - ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch)); - ready.coordination.addCallback(() -> topologyUpdates.syncComplete(lookup.apply(node), epochs.get(ready.epoch).topology.nodes(), ready.epoch)); - } - - private Node originator() + protected void epochSyncComplete(Topology topology) { - return lookup.apply(node); + topologyUpdates.syncComplete(lookup.apply(node), topology.nodes(), topology.epoch()); } @Override - public synchronized AsyncResult<Void> reportTopology(Topology topology) + protected void topologyUpdatePostListenerNotify(Topology topology) { - long lastReceived = epochs.lastReceived; - if (topology.epoch() <= lastReceived) - return AsyncResults.success(null); - - if (topology.epoch() > lastReceived + 1) - { - fetchTopologyForEpoch(lastReceived + 1); - epochs.receiveFuture(lastReceived + 1).addCallback(() -> reportTopology(topology)); - return AsyncResults.success(null); - } - - long lastAcked = epochs.lastAcknowledged; - if (topology.epoch() > lastAcked + 1) - { - epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> reportTopology(topology)); - return AsyncResults.success(null); - } - logger.trace("Epoch {} received by {}", topology.epoch(), node); - - epochs.receive(topology); - for (Listener listener : listeners) - listener.onTopologyUpdate(topology); - FetchTopology fetch = pendingEpochs.remove(topology.epoch()); if (fetch == null) - return AsyncResults.success(null); + return; fetch.setSuccess(null); - return AsyncResults.success(null); + } + + private Node originator() + { + return lookup.apply(node); } } diff --git a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java new file mode 100644 index 00000000..d0e6ee03 --- /dev/null +++ b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package accord.impl; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import accord.api.ConfigurationService.EpochReady; +import accord.utils.async.AsyncResult; +import accord.utils.async.AsyncResults; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import accord.api.ConfigurationService; +import accord.impl.AbstractConfigurationService.EpochHistory; +import accord.local.Node.Id; +import accord.primitives.Range; +import accord.topology.Shard; +import accord.topology.Topology; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class AbstractConfigurationServiceTest +{ + public static class TestListener implements ConfigurationService.Listener + { + private final ConfigurationService parent; + private final boolean ackTopologies; + final Map<Long, Topology> topologies = new HashMap<>(); + final Map<Long, Set<Id>> syncCompletes = new HashMap<>(); + final Set<Long> truncates = new HashSet<>(); + + public TestListener(ConfigurationService parent, boolean ackTopologies) + { + this.parent = parent; + this.ackTopologies = ackTopologies; + } + + @Override + public AsyncResult<Void> onTopologyUpdate(Topology topology) + { + if (topologies.put(topology.epoch(), topology) != null) + Assertions.fail("Received topology twice for epoch " + topology.epoch()); + if (ackTopologies) + parent.acknowledgeEpoch(EpochReady.done(topology.epoch())); + return AsyncResults.success(null); + } + + @Override + public void onEpochSyncComplete(Id node, long epoch) + { + Set<Id> synced = syncCompletes.computeIfAbsent(epoch, e -> new HashSet<>()); + if (!synced.add(node)) + Assertions.fail(String.format("Recieved multiple syncs for epoch %s from %s", epoch, node)); + } + + @Override + public void truncateTopologyUntil(long epoch) + { + if (!truncates.add(epoch)) + Assertions.fail(String.format("Recieved multiple truncates for epoch", epoch)); + } + + public void assertNoTruncates() + { + Assertions.assertTrue(truncates.isEmpty()); + } + + public void assertTruncates(Long... epochs) + { + Assertions.assertEquals(ImmutableSet.copyOf(epochs), truncates); + } + + public void assertSyncsFor(Long... epochs) + { + Assertions.assertEquals(ImmutableSet.copyOf(epochs), syncCompletes.keySet()); + } + + public void assertSyncsForEpoch(long epoch, Id... nodes) + { + Assertions.assertEquals(ImmutableSet.copyOf(nodes), syncCompletes.get(epoch)); + } + + public void assertTopologiesFor(Long... epochs) + { + Assertions.assertEquals(ImmutableSet.copyOf(epochs), topologies.keySet()); + } + + public void assertTopologyForEpoch(long epoch, Topology topology) + { + Assertions.assertEquals(topology, topologies.get(epoch)); + } + } + + private static class TestableConfigurationService extends AbstractConfigurationService + { + final Set<Long> syncStarted = new HashSet<>(); + final Set<Long> epochsFetched = new HashSet<>(); + + public TestableConfigurationService(Id node) + { + super(node); + } + + @Override + protected void fetchTopologyInternal(long epoch) + { + epochsFetched.add(epoch); + } + + @Override + protected void epochSyncComplete(Topology topology) + { + if (!syncStarted.add(topology.epoch())) + Assertions.fail("Sync started multiple times for " + topology.epoch()); + } + + @Override + protected void topologyUpdatePostListenerNotify(Topology topology) + { + acknowledgeEpoch(EpochReady.done(topology.epoch())); + } + } + + private static final Id ID1 = new Id(1); + private static final Id ID2 = new Id(2); + private static final Id ID3 = new Id(3); + private static final List<Id> NODES = ImmutableList.of(ID1, ID2, ID3); + private static final Range RANGE = IntKey.range(0, 100); + + private static Shard shard(Range range, List<Id> nodes, Set<Id> fastPath) + { + return new Shard(range, nodes, fastPath); + } + + private static Topology topology(long epoch, Range range, List<Id> nodes, Set<Id> fastPath) + { + return new Topology(epoch, shard(range, nodes, fastPath)); + } + + private static Topology topology(long epoch, Id... fastPath) + { + return topology(epoch, RANGE, NODES, ImmutableSet.copyOf(fastPath)); + } + + private static Topology topology(long epoch, int... fastPath) + { + Set<Id> fpSet = Arrays.stream(fastPath).mapToObj(Id::new).collect(Collectors.toSet()); + return topology(epoch, RANGE, NODES, fpSet); + } + + private static final Topology TOPOLOGY1 = topology(1, 1, 2, 3); + private static final Topology TOPOLOGY2 = topology(2, 1, 2); + private static final Topology TOPOLOGY3 = topology(3, 1, 3); + private static final Topology TOPOLOGY4 = topology(4, 2, 3); + + @Test + public void getTopologyTest() + { + TestableConfigurationService service = new TestableConfigurationService(ID1); + TestListener listener = new TestListener(service, false); + service.registerListener(listener); + service.reportTopology(TOPOLOGY1); + service.reportTopology(TOPOLOGY2); + service.reportTopology(TOPOLOGY3); + service.reportTopology(TOPOLOGY4); + + listener.assertNoTruncates(); + listener.assertTopologiesFor(1L, 2L, 3L, 4L); + Assertions.assertSame(TOPOLOGY1, service.getTopologyForEpoch(1)); + Assertions.assertSame(TOPOLOGY2, service.getTopologyForEpoch(2)); + Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3)); + Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4)); + } + + /** + * check everything works properly if we start loading after epoch 1 has + * been removed + */ + @Test + public void loadAfterTruncate() + { + TestableConfigurationService service = new TestableConfigurationService(ID1); + TestListener listener = new TestListener(service, false); + service.registerListener(listener); + service.reportTopology(TOPOLOGY3); + service.reportTopology(TOPOLOGY4); + + listener.assertNoTruncates(); + listener.assertTopologiesFor(3L, 4L); + Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3)); + Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4)); + } + + /** + * If we receive topology epochs out of order for some reason, we should + * reorder with callbacks + */ + @Test + public void awaitOutOfOrderTopologies() + { + TestableConfigurationService service = new TestableConfigurationService(ID1); + + TestListener listener = new TestListener(service, false); + service.registerListener(listener); + + service.reportTopology(TOPOLOGY1); + service.reportTopology(TOPOLOGY3); + listener.assertTopologiesFor(1L); + Assertions.assertEquals(ImmutableSet.of(2L), service.epochsFetched); + + service.reportTopology(TOPOLOGY2); + listener.assertTopologiesFor(1L, 2L, 3L); + + } + + private static void assertHistoryEpochs(EpochHistory history, long... expected) + { + Assertions.assertEquals(history.size(), expected.length); + if (expected.length == 0) + return; + + Assertions.assertEquals(expected[0], history.minEpoch()); + Assertions.assertEquals(expected[expected.length - 1], history.maxEpoch()); + + for (int i=0; i<expected.length; i++) + Assertions.assertEquals(expected[i], history.atIndex(i).epoch()); + } + + @Test + public void epochHistoryAppend() + { + EpochHistory history = new EpochHistory(); + Assertions.assertEquals(0, history.size()); + + history.getOrCreate(5); + assertHistoryEpochs(history, 5); + + history.getOrCreate(6); + assertHistoryEpochs(history, 5, 6); + + history.getOrCreate(8); + assertHistoryEpochs(history, 5, 6, 7, 8); + } + + @Test + public void epochHistoryPrepend() + { + EpochHistory history = new EpochHistory(); + Assertions.assertEquals(0, history.size()); + + history.getOrCreate(5); + history.getOrCreate(6); + assertHistoryEpochs(history, 5, 6); + + history.getOrCreate(3); + assertHistoryEpochs(history, 3, 4, 5, 6); + } + + @Test + public void epochHistoryTruncate() + { + EpochHistory history = new EpochHistory(); + Assertions.assertEquals(0, history.size()); + + history.getOrCreate(1); + history.getOrCreate(2); + history.getOrCreate(3); + history.getOrCreate(4); + history.getOrCreate(5); + history.getOrCreate(6); + + assertHistoryEpochs(history, 1, 2, 3, 4, 5, 6); + + history.truncateUntil(4); + assertHistoryEpochs(history, 4, 5, 6); + + history.getOrCreate(7); + assertHistoryEpochs(history, 4, 5, 6, 7); + } +} diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index d6a244d9..d2bc2337 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -119,6 +119,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> SimpleProgressLog::new, InMemoryCommandStores.SingleThread::new); awaitUninterruptibly(node.start()); + node.onTopologyUpdate(topology); return node; } diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java index 18bb226d..c39f46a1 100644 --- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java +++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java @@ -100,6 +100,7 @@ public class ImmutableCommandTest new MockCluster.Clock(100), () -> storeSupport.data, new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new TestAgent(), new DefaultRandom(), null, SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 -> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new); awaitUninterruptibly(node.start()); + node.onTopologyUpdate(storeSupport.local.get()); return node; } diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index b3a6994e..ec4f58d9 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -196,7 +196,8 @@ public class PreAcceptTest messageSink.assertHistorySizes(0, 1); Assertions.assertEquals(ID3, messageSink.responses.get(0).to); PartialDeps expectedDeps = new PartialDeps(Ranges.of(range(0, 12)), KeyDeps.NONE, RangeDeps.NONE); - Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, Timestamp.fromValues(1, 110, ID1), expectedDeps), + Timestamp expectedTs = Timestamp.fromValues(1, 110, ID1).withExtraFlags(txnId2.flags()); + Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, expectedTs, expectedDeps), messageSink.responses.get(0).payload); } finally diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java index ec85d157..bec84683 100644 --- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java +++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java @@ -200,4 +200,67 @@ public class TopologyManagerTest Assertions.assertEquals(topologies(topology2, topology(1, shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))), actual); } + + @Test + void incompleteTopologyHistory() + { + Topology topology5 = topology(5, + shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), + shard(range(200, 300), idList(4, 5, 6), idSet(4, 5))); + Topology topology6 = topology(6, + shard(range(100, 200), idList(1, 2, 3), idSet(1, 2)), + shard(range(200, 300), idList(4, 5, 6), idSet(5, 6))); + + TopologyManager service = new TopologyManager(SUPPLIER, ID); + service.onTopologyUpdate(topology5); + service.onTopologyUpdate(topology6); + + Assertions.assertSame(topology6, service.getEpochStateUnsafe(6).global()); + Assertions.assertSame(topology5, service.getEpochStateUnsafe(5).global()); + for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 5); + Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete()); + Assertions.assertNull(service.getEpochStateUnsafe(4)); + + service.onEpochSyncComplete(id(1), 4); + } + + private static void markTopologySynced(TopologyManager service, long epoch) + { + service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> service.onEpochSyncComplete(id, epoch)); + } + + private static void addAndMarkSynced(TopologyManager service, Topology topology) + { + service.onTopologyUpdate(topology); + markTopologySynced(service, topology.epoch()); + } + + @Test + void truncateTopologyHistory() + { + Range range = range(100, 200); + TopologyManager service = new TopologyManager(SUPPLIER, ID); + addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), idSet(1, 2)))); + addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), idSet(2, 3)))); + addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), idSet(1, 2)))); + addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), idSet(1, 3)))); + + Assertions.assertTrue(service.hasEpoch(1)); + Assertions.assertTrue(service.hasEpoch(2)); + Assertions.assertTrue(service.hasEpoch(3)); + Assertions.assertTrue(service.hasEpoch(4)); + + service.truncateTopologyUntil(3); + Assertions.assertFalse(service.hasEpoch(1)); + Assertions.assertFalse(service.hasEpoch(2)); + Assertions.assertTrue(service.hasEpoch(3)); + Assertions.assertTrue(service.hasEpoch(4)); + + } + + @Test + void truncateTopologyCantTruncateUnsyncedEpochs() + { + + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org