This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit c04ea346a543b9a8d2dc9efd29f735c29cb67200 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Thu Mar 10 12:46:17 2022 -0800 Topology reconfiguration fixes Patch by Blake Eggleston; Reveiewed by Benedict Elliott Smith --- .../main/java/accord/api/ConfigurationService.java | 7 +- .../src/main/java/accord/coordinate/Agree.java | 28 +- .../main/java/accord/coordinate/Coordinate.java | 8 +- .../tracking/AbstractResponseTracker.java | 6 +- .../accord/coordinate/tracking/ReadTracker.java | 2 +- .../src/main/java/accord/local/Command.java | 2 +- .../src/main/java/accord/local/CommandStore.java | 115 +++---- .../src/main/java/accord/local/CommandStores.java | 341 ++++++++++++--------- .../src/main/java/accord/local/CommandsForKey.java | 5 + accord-core/src/main/java/accord/local/Node.java | 75 +++-- .../src/main/java/accord/messages/Accept.java | 6 +- .../src/main/java/accord/messages/Apply.java | 2 +- .../main/java/accord/messages/BeginRecovery.java | 17 +- .../src/main/java/accord/messages/Commit.java | 2 +- .../src/main/java/accord/messages/PreAccept.java | 6 +- .../src/main/java/accord/messages/ReadData.java | 2 +- .../src/main/java/accord/messages/TxnRequest.java | 120 ++------ .../main/java/accord/messages/WaitOnCommit.java | 8 +- .../src/main/java/accord/topology/KeyRanges.java | 5 + .../src/main/java/accord/topology/Topology.java | 11 +- .../main/java/accord/topology/TopologyManager.java | 111 +++---- accord-core/src/main/java/accord/txn/Keys.java | 123 +++++--- accord-core/src/main/java/accord/txn/Txn.java | 5 +- accord-core/src/main/java/accord/txn/Writes.java | 4 +- .../accord/utils/DeterministicIdentitySet.java | 8 +- .../java/accord/utils/ThreadPoolScheduler.java | 3 +- accord-core/src/test/java/accord/KeysTest.java | 52 ++++ .../src/test/java/accord/burn/BurnTest.java | 2 +- .../accord/burn/BurnTestConfigurationService.java | 130 ++++++-- .../src/test/java/accord/burn/TopologyUpdate.java | 67 +--- .../test/java/accord/coordinate/RecoverTest.java | 2 +- .../java/accord/coordinate/TopologyChangeTest.java | 17 +- .../src/test/java/accord/impl/basic/Cluster.java | 8 +- .../src/test/java/accord/impl/list/ListData.java | 14 +- .../src/test/java/accord/impl/list/ListWrite.java | 10 + .../src/test/java/accord/impl/mock/EpochSync.java | 4 +- .../test/java/accord/impl/mock/MockCluster.java | 5 +- .../accord/impl/mock/MockConfigurationService.java | 15 +- .../test/java/accord/messages/PreAcceptTest.java | 10 +- .../java/accord/messages/TxnRequestScopeTest.java | 29 +- .../java/accord/topology/TopologyRandomizer.java | 52 +++- .../src/test/java/accord/utils/MessageTask.java | 1 + .../src/main/java/accord/maelstrom/Cluster.java | 4 +- .../main/java/accord/maelstrom/MaelstromData.java | 3 +- .../accord/maelstrom/MaelstromReplyContext.java | 2 + .../src/main/java/accord/maelstrom/Main.java | 4 +- .../java/accord/maelstrom/SimpleConfigService.java | 4 +- 47 files changed, 839 insertions(+), 618 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java b/accord-core/src/main/java/accord/api/ConfigurationService.java index 1089644..7f5bf88 100644 --- a/accord-core/src/main/java/accord/api/ConfigurationService.java +++ b/accord-core/src/main/java/accord/api/ConfigurationService.java @@ -68,11 +68,10 @@ public interface ConfigurationService Topology getTopologyForEpoch(long epoch); /** - * Method for reporting epochs the configuration service may not be aware of, and optionally running a supplied - * runnable once the corresponding topology has been received and applied. If the configuration service is already - * aware of the reported epoch, the runnable should be run immediately. + * Method for reporting epochs the configuration service may not be aware of. To be notified when the new epoch + * is available locally, use {@link accord.topology.TopologyManager#awaitEpoch(long)} */ - Future<Void> fetchTopologyForEpoch(long epoch); + void fetchTopologyForEpoch(long epoch); /** * Alert the configuration service of epochs it may not be aware of. This is called called for every TxnRequest diff --git a/accord-core/src/main/java/accord/coordinate/Agree.java b/accord-core/src/main/java/accord/coordinate/Agree.java index 8a3777c..eb91821 100644 --- a/accord-core/src/main/java/accord/coordinate/Agree.java +++ b/accord-core/src/main/java/accord/coordinate/Agree.java @@ -54,7 +54,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> volatile long supersedingEpoch = -1; private final boolean fastPathPermitted; private final Set<Id> successes = new HashSet<>(); - private final Set<Id> failures = new HashSet<>(); + private Set<Id> failures; public PreacceptTracker(Topologies topologies, boolean fastPathPermitted) { @@ -70,6 +70,8 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> @Override public void recordFailure(Id node) { + if (failures == null) + failures = new HashSet<>(); failures.add(node); super.recordFailure(node); } @@ -103,7 +105,8 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> { PreacceptTracker tracker = new PreacceptTracker(topologies, false); successes.forEach(tracker::recordSuccess); - failures.forEach(tracker::recordFailure); + if (failures != null) + failures.forEach(tracker::recordFailure); return tracker; } @@ -112,6 +115,11 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> { return fastPathPermitted && super.hasMetFastPathCriteria(); } + + boolean shouldSlowPathAccept() + { + return (!fastPathPermitted || !hasInFlight()) && hasReachedQuorum(); + } } final Keys keys; @@ -153,7 +161,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> tryFailure(new Timeout()); // if no other responses are expected and the slow quorum has been satisfied, proceed - if (shouldSlowPathAccept()) + if (tracker.shouldSlowPathAccept()) onPreAccepted(); } @@ -172,7 +180,7 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> if (!needMessages.isEmpty()) node.send(needMessages, to -> new PreAccept(to, newTopologies, txnId, txn), this); - if (shouldSlowPathAccept()) + if (tracker.shouldSlowPathAccept()) onPreAccepted(); } @@ -197,10 +205,13 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> if (!fastPath && ok.witnessedAt.epoch > txnId.epoch) { if (tracker.recordSupersedingEpoch(ok.witnessedAt.epoch)) - node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate); + { + node.configService().fetchTopologyForEpoch(ok.witnessedAt.epoch); + node.topology().awaitEpoch(ok.witnessedAt.epoch).addListener(this::onEpochUpdate); + } } - if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || shouldSlowPathAccept())) + if (!tracker.hasSupersedingEpoch() && (tracker.hasMetFastPathCriteria() || tracker.shouldSlowPathAccept())) onPreAccepted(); } @@ -236,11 +247,6 @@ class Agree extends AcceptPhase implements Callback<PreAcceptReply> } } - private boolean shouldSlowPathAccept() - { - return (!tracker.fastPathPermitted || !tracker.hasInFlight()) && tracker.hasReachedQuorum(); - } - private boolean isPreAccepted() { return preacceptOutcome != null; diff --git a/accord-core/src/main/java/accord/coordinate/Coordinate.java b/accord-core/src/main/java/accord/coordinate/Coordinate.java index b54d124..c68d5ba 100644 --- a/accord-core/src/main/java/accord/coordinate/Coordinate.java +++ b/accord-core/src/main/java/accord/coordinate/Coordinate.java @@ -14,9 +14,11 @@ public class Coordinate { long executeEpoch = agreed.executeAt.epoch; ConfigurationService configService = node.configService(); - if (executeEpoch > configService.currentEpoch()) - return configService.fetchTopologyForEpoch(executeEpoch) - .flatMap(v -> fetchEpochOrExecute(node, agreed)); + if (executeEpoch > node.topology().epoch()) + { + configService.fetchTopologyForEpoch(executeEpoch); + return node.topology().awaitEpoch(executeEpoch).flatMap(v -> fetchEpochOrExecute(node, agreed)); + } return Execute.execute(node, agreed); } diff --git a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java index 9dc6268..942cde7 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/AbstractResponseTracker.java @@ -112,11 +112,11 @@ public abstract class AbstractResponseTracker<T extends AbstractResponseTracker. return false; } - protected <V> V accumulate(BiFunction<T, V, V> function, V start) + protected <V> V foldl(BiFunction<T, V, V> function, V accumulator) { for (T tracker : trackers) - start = function.apply(tracker, start); - return start; + accumulator = function.apply(tracker, accumulator); + return accumulator; } public Set<Node.Id> nodes() diff --git a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java index 100bdd3..3c526a2 100644 --- a/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java +++ b/accord-core/src/main/java/accord/coordinate/tracking/ReadTracker.java @@ -108,7 +108,7 @@ public class ReadTracker extends AbstractResponseTracker<ReadTracker.ReadShardTr */ public Set<Id> computeMinimalReadSetAndMarkInflight() { - Set<ReadShardTracker> toRead = accumulate((tracker, accumulate) -> { + Set<ReadShardTracker> toRead = foldl((tracker, accumulate) -> { if (!tracker.shouldRead()) return accumulate; diff --git a/accord-core/src/main/java/accord/local/Command.java b/accord-core/src/main/java/accord/local/Command.java index a520b60..1e3e1f8 100644 --- a/accord-core/src/main/java/accord/local/Command.java +++ b/accord-core/src/main/java/accord/local/Command.java @@ -354,10 +354,10 @@ public class Command implements Listener, Consumer<Listener> { return "Command{" + "txnId=" + txnId + + ", status=" + status + ", txn=" + txn + ", executeAt=" + executeAt + ", deps=" + deps + - ", status=" + status + '}'; } } diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 3241cd7..ae1c153 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -4,12 +4,13 @@ import accord.api.Agent; import accord.api.Key; import accord.api.KeyRange; import accord.api.Store; +import accord.local.CommandStores.StoreGroup; +import accord.local.Node.Id; import accord.topology.KeyRanges; import accord.topology.Topology; import accord.txn.Keys; import accord.txn.Timestamp; import accord.txn.TxnId; -import com.google.common.base.Preconditions; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.Promise; @@ -21,6 +22,9 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import com.google.common.base.Preconditions; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + /** * Single threaded internal shard of accord transaction metadata */ @@ -37,9 +41,6 @@ public abstract class CommandStore Store store, KeyRanges ranges, Supplier<Topology> localTopologySupplier); - Factory SYNCHRONIZED = Synchronized::new; - Factory SINGLE_THREAD = SingleThread::new; - Factory SINGLE_THREAD_DEBUG = SingleThreadDebug::new; } private final int generation; @@ -126,27 +127,6 @@ public abstract class CommandStore return ranges; } - void purgeRanges(KeyRanges removed) - { - for (KeyRange range : removed) - { - NavigableMap<Key, CommandsForKey> subMap = commandsForKey.subMap(range.start(), range.startInclusive(), range.end(), range.endInclusive()); - Iterator<Key> keyIterator = subMap.keySet().iterator(); - while (keyIterator.hasNext()) - { - Key key = keyIterator.next(); - CommandsForKey forKey = commandsForKey.get(key); - if (forKey != null) - { - for (Command command : forKey) - if (command.txn() != null && !ranges.intersects(command.txn().keys)) - commands.remove(command.txnId()); - } - keyIterator.remove(); - } - } - } - public void forEpochCommands(KeyRanges ranges, long epoch, Consumer<Command> consumer) { Timestamp minTimestamp = new Timestamp(epoch, Long.MIN_VALUE, Integer.MIN_VALUE, Node.Id.NONE); @@ -193,7 +173,7 @@ public abstract class CommandStore public boolean hashIntersects(Key key) { - return CommandStores.keyIndex(key, numShards) == index; + return StoreGroup.keyIndex(key, numShards) == index; } public boolean intersects(Keys keys) @@ -201,6 +181,11 @@ public abstract class CommandStore return keys.any(ranges, this::hashIntersects); } + public boolean contains(Key key) + { + return ranges.contains(key); + } + public static void onEach(Collection<CommandStore> stores, Consumer<? super CommandStore> consumer) { for (CommandStore store : stores) @@ -234,6 +219,8 @@ public abstract class CommandStore public abstract Future<Void> process(Consumer<? super CommandStore> consumer); + public abstract <T> Future<T> process(Function<? super CommandStore, T> function); + public void processBlocking(Consumer<? super CommandStore> consumer) { try @@ -242,7 +229,7 @@ public abstract class CommandStore } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + throw new UncheckedInterruptedException(e); } catch (ExecutionException e) { @@ -254,15 +241,10 @@ public abstract class CommandStore public static class Synchronized extends CommandStore { - public Synchronized(int generation, - int index, - int numShards, - Node.Id nodeId, - Function<Timestamp, Timestamp> uniqueNow, - Agent agent, - Store store, - KeyRanges ranges, - Supplier<Topology> localTopologySupplier) + public Synchronized(int generation, int index, int numShards, Node.Id nodeId, + Function<Timestamp, Timestamp> uniqueNow, + Agent agent, Store store, + KeyRanges ranges, Supplier<Topology> localTopologySupplier) { super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier); } @@ -275,6 +257,14 @@ public abstract class CommandStore return promise; } + @Override + public <T> Future<T> process(Function<? super CommandStore, T> function) + { + AsyncPromise<T> promise = new AsyncPromise<>(); + processInternal(function, promise); + return promise; + } + @Override public void shutdown() {} } @@ -299,15 +289,26 @@ public abstract class CommandStore } } - public SingleThread(int generation, - int index, - int numShards, - Node.Id nodeId, - Function<Timestamp, Timestamp> uniqueNow, - Agent agent, - Store store, - KeyRanges ranges, - Supplier<Topology> localTopologySupplier) + private class FunctionWrapper<T> extends AsyncPromise<T> implements Runnable + { + private final Function<? super CommandStore, T> function; + + public FunctionWrapper(Function<? super CommandStore, T> function) + { + this.function = function; + } + + @Override + public void run() + { + processInternal(function, this); + } + } + + public SingleThread(int generation, int index, int numShards, Node.Id nodeId, + Function<Timestamp, Timestamp> uniqueNow, + Agent agent, Store store, + KeyRanges ranges, Supplier<Topology> localTopologySupplier) { super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier); executor = Executors.newSingleThreadExecutor(r -> { @@ -325,6 +326,14 @@ public abstract class CommandStore return future; } + @Override + public <T> Future<T> process(Function<? super CommandStore, T> function) + { + FunctionWrapper<T> future = new FunctionWrapper<>(function); + executor.execute(future); + return future; + } + @Override public void shutdown() { @@ -332,19 +341,14 @@ public abstract class CommandStore } } - public static class SingleThreadDebug extends SingleThread + static class Debug extends SingleThread { private final AtomicReference<Thread> expectedThread = new AtomicReference<>(); - public SingleThreadDebug(int generation, - int index, - int numShards, - Node.Id nodeId, - Function<Timestamp, Timestamp> uniqueNow, - Agent agent, - Store store, - KeyRanges ranges, - Supplier<Topology> localTopologySupplier) + public Debug(int generation, int index, int numShards, Node.Id nodeId, + Function<Timestamp, Timestamp> uniqueNow, + Agent agent, Store store, + KeyRanges ranges, Supplier<Topology> localTopologySupplier) { super(generation, index, numShards, nodeId, uniqueNow, agent, store, ranges, localTopologySupplier); } @@ -405,4 +409,5 @@ public abstract class CommandStore super.processInternal(consumer, future); } } + } diff --git a/accord-core/src/main/java/accord/local/CommandStores.java b/accord-core/src/main/java/accord/local/CommandStores.java index 8e5886c..68efb68 100644 --- a/accord-core/src/main/java/accord/local/CommandStores.java +++ b/accord-core/src/main/java/accord/local/CommandStores.java @@ -3,25 +3,36 @@ package accord.local; import accord.api.Agent; import accord.api.Key; import accord.api.Store; +import accord.local.CommandStore.SingleThread; +import accord.local.CommandStores.StoreGroups.Fold; +import accord.local.Node.Id; import accord.messages.TxnRequest; import accord.topology.KeyRanges; import accord.topology.Topology; import accord.txn.Keys; import accord.txn.Timestamp; -import com.google.common.base.Preconditions; import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.function.*; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; + +import static java.lang.Boolean.FALSE; /** * Manages the single threaded metadata shards */ -public class CommandStores +public abstract class CommandStores { + public interface Factory + { + CommandStores create(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store); + } + static class StoreGroup { final CommandStore[] stores; @@ -29,85 +40,53 @@ public class CommandStores public StoreGroup(CommandStore[] stores, KeyRanges ranges) { + Preconditions.checkArgument(stores.length <= 64); this.stores = stores; this.ranges = ranges; } - private static class AccumulatingBitset extends BitSet implements Keys.KeyAccumulator<Keys> + long all() { - final int numStores; - boolean isDone = false; - public AccumulatingBitset(int numStores) - { - super(numStores); - this.numStores = numStores; - } - - @Override - public Keys accumulate(Key key, Keys keys) - { - int idx = keyIndex(key, numStores); - if (get(idx)) - return keys; - set(idx); - isDone = cardinality() == numStores; - return keys; - } - - @Override - public boolean isDone() - { - return isDone; - } + return -1L >>> (64 - stores.length); } - public Stream<CommandStore> stream() + long matches(Keys keys) { - return StreamSupport.stream(new ShardSpliterator(stores), false); + return keys.foldl(ranges, StoreGroup::addKeyIndex, stores.length, 0L, -1L); } - public Stream<CommandStore> stream(Keys keys) + long matches(TxnRequest.Scope scope) { - AccumulatingBitset bitSet = new AccumulatingBitset(stores.length); - keys.accumulate(ranges, bitSet, keys); - if (bitSet.cardinality() == 0) - return null; - return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false); + return matches(scope.keys()); } - public Stream<CommandStore> stream(TxnRequest.Scope scope) + static long keyIndex(Key key, long numShards) { - AccumulatingBitset bitSet = new AccumulatingBitset(stores.length); - for (int i=0, mi=scope.size(); i<mi; i++) - { - Keys keys = scope.get(i).keys; - keys.accumulate(ranges, bitSet, keys); - if (bitSet.isDone) - break; - } - if (bitSet.cardinality() == 0) - return null; - return StreamSupport.stream(new ShardSpliterator(stores, bitSet::get), false); + return Integer.toUnsignedLong(key.keyHash()) % numShards; } - } - static int keyIndex(Key key, int numShards) - { - return (int) (Integer.toUnsignedLong(key.keyHash()) % numShards); + private static long addKeyIndex(Key key, long numShards, long accumulate) + { + return accumulate | (1L << keyIndex(key, numShards)); + } } static class StoreGroups { - static final StoreGroups EMPTY = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY); final StoreGroup[] groups; final Topology global; final Topology local; + final int size; public StoreGroups(StoreGroup[] groups, Topology global, Topology local) { this.groups = groups; this.global = global; this.local = local; + int size = 0; + for (StoreGroup group : groups) + size += group.stores.length; + this.size = size; } StoreGroups withNewTopology(Topology global, Topology local) @@ -115,43 +94,79 @@ public class CommandStores return new StoreGroups(groups, global, local); } - public Stream<CommandStore> stream() + public int size() { - Stream<CommandStore> stream = null; - for (StoreGroup group : groups) - { - Stream<CommandStore> nextStream = group.stream(); - if (nextStream == null) continue; - stream = stream != null ? Stream.concat(stream, nextStream) : nextStream; - } - - return stream != null ? stream : Stream.empty(); + return size; } - public Stream<CommandStore> stream(Keys keys) + private <I1, I2, O> O foldl(int startGroup, long bitset, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, O accumulator) { - Stream<CommandStore> stream = null; - for (StoreGroup group : groups) + int groupIndex = startGroup; + StoreGroup group = groups[groupIndex]; + int offset = 0; + while (true) { - Stream<CommandStore> nextStream = group.stream(keys); - if (nextStream == null) continue; - stream = stream != null ? Stream.concat(stream, nextStream) : nextStream; + int i = Long.numberOfTrailingZeros(bitset) - offset; + while (i < group.stores.length) + { + accumulator = fold.fold(group.stores[i], param1, param2, accumulator); + bitset ^= Long.lowestOneBit(bitset); + i = Long.numberOfTrailingZeros(bitset) - offset; + } + + if (++groupIndex == groups.length) + break; + + if (bitset == 0) + break; + + offset += group.stores.length; + group = groups[groupIndex]; + if (offset + group.stores.length > 64) + break; } + return accumulator; + } - return stream != null ? stream : Stream.empty(); + interface Fold<I1, I2, O> + { + O fold(CommandStore store, I1 i1, I2 i2, O accumulator); } - public Stream<CommandStore> stream(TxnRequest.Scope scope) + <S, I1, I2, O> O foldl(ToLongBiFunction<StoreGroup, S> select, S scope, Fold<? super I1, ? super I2, O> fold, I1 param1, I2 param2, IntFunction<? extends O> factory) { - Stream<CommandStore> stream = null; - for (StoreGroup group : groups) + O accumulator = null; + int startGroup = 0; + while (startGroup < groups.length) { - Stream<CommandStore> nextStream = group.stream(scope); - if (nextStream == null) continue; - stream = stream != null ? Stream.concat(stream, nextStream) : nextStream; + long bits = select.applyAsLong(groups[startGroup], scope); + if (bits == 0) + { + ++startGroup; + continue; + } + + int offset = groups[startGroup].stores.length; + int endGroup = startGroup + 1; + while (endGroup < groups.length) + { + StoreGroup group = groups[endGroup]; + if (offset + group.stores.length > 64) + break; + + bits += select.applyAsLong(group, scope) << offset; + offset += group.stores.length; + ++endGroup; + } + + if (accumulator == null) + accumulator = factory.apply(Long.bitCount(bits)); + + accumulator = foldl(startGroup, bits, fold, param1, param2, accumulator); + startGroup = endGroup; } - return stream != null ? stream : Stream.empty(); + return accumulator; } } @@ -161,7 +176,7 @@ public class CommandStores private final Store store; private final CommandStore.Factory shardFactory; private final int numShards; - private volatile StoreGroups groups = StoreGroups.EMPTY; + protected volatile StoreGroups groups = new StoreGroups(new StoreGroup[0], Topology.EMPTY, Topology.EMPTY); public CommandStores(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory) { @@ -190,19 +205,43 @@ public class CommandStores commandStore.shutdown(); } - public Stream<CommandStore> stream() + protected abstract <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach); + protected abstract <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce); + + public void forEach(Consumer<CommandStore> forEach) + { + forEach((s, i) -> s.all(), null, forEach); + } + + public void forEach(Keys keys, Consumer<CommandStore> forEach) + { + forEach(StoreGroup::matches, keys, forEach); + } + + public void forEach(TxnRequest.Scope scope, Consumer<CommandStore> forEach) { - return groups.stream(); + forEach(StoreGroup::matches, scope, forEach); } - public Stream<CommandStore> forKeys(Keys keys) + public <T> T mapReduce(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce) { - return groups.stream(keys); + return mapReduce(StoreGroup::matches, scope, map, reduce); } - public Stream<CommandStore> forScope(TxnRequest.Scope scope) + public <T extends Collection<CommandStore>> T collect(Keys keys, IntFunction<T> factory) { - return groups.stream(scope); + return groups.foldl(StoreGroup::matches, keys, CommandStores::append, null, null, factory); + } + + public <T extends Collection<CommandStore>> T collect(TxnRequest.Scope scope, IntFunction<T> factory) + { + return groups.foldl(StoreGroup::matches, scope, CommandStores::append, null, null, factory); + } + + private static <T extends Collection<CommandStore>> T append(CommandStore store, Object ignore1, Object ignore2, T to) + { + to.add(store); + return to; } public synchronized void updateTopology(Topology cluster) @@ -214,8 +253,15 @@ public class CommandStores return; Topology local = cluster.forNode(node); - KeyRanges currentRanges = Arrays.stream(current.groups).map(group -> group.ranges).reduce(KeyRanges.EMPTY, (l, r) -> l.union(r)).mergeTouching(); - KeyRanges added = local.ranges().difference(currentRanges); + KeyRanges added = local.ranges().difference(current.local.ranges()); + + for (StoreGroup group : groups.groups) + { + // FIXME: remove this (and the corresponding check in TopologyRandomizer) once lower bounds are implemented. + // In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty + // convoluted without the ability to jettison epochs. + Preconditions.checkState(!group.ranges.intersects(added)); + } if (added.isEmpty()) { @@ -236,90 +282,97 @@ public class CommandStores groups = new StoreGroups(newGroups, cluster, local); } - private static class ShardSpliterator implements Spliterator<CommandStore> + @VisibleForTesting + public CommandStore unsafeForKey(Key key) { - int i = 0; - final CommandStore[] commandStores; - final IntPredicate predicate; - - public ShardSpliterator(CommandStore[] commandStores, IntPredicate predicate) + for (StoreGroup group : groups.groups) { - this.commandStores = commandStores; - this.predicate = predicate; + if (group.ranges.contains(key)) + { + for (CommandStore store : group.stores) + { + if (store.hashIntersects(key)) + return store; + } + } } + throw new IllegalArgumentException(); + } - public ShardSpliterator(CommandStore[] commandStores) + public static class Synchronized extends CommandStores + { + public Synchronized(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store) { - this (commandStores, i -> true); + super(num, node, uniqueNow, agent, store, CommandStore.Synchronized::new); } @Override - public boolean tryAdvance(Consumer<? super CommandStore> action) + protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce) { - while (i < commandStores.length) - { - int idx = i++; - if (!predicate.test(idx)) - continue; - try - { - commandStores[idx].process(action).get(); - break; - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } - - } - return i < commandStores.length; + return groups.foldl(select, scope, (store, f, r, t) -> t == null ? f.apply(store) : r.apply(t, f.apply(store)), map, reduce, ignore -> null); } @Override - public void forEachRemaining(Consumer<? super CommandStore> action) + protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach) { - if (i >= commandStores.length) - return; + groups.foldl(select, scope, (store, f, r, t) -> { f.accept(store); return null; }, forEach, null, ignore -> FALSE); + } + } - List<Future<Void>> futures = new ArrayList<>(commandStores.length - i); - for (; i< commandStores.length; i++) - { - if (predicate.test(i)) - futures.add(commandStores[i].process(action)); - } + public static class SingleThread extends CommandStores + { + public SingleThread(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store) + { + this(num, node, uniqueNow, agent, store, CommandStore.SingleThread::new); + } - try - { - for (int i=0, mi=futures.size(); i<mi; i++) - futures.get(i).get(); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } - catch (ExecutionException e) + public SingleThread(int num, Node.Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store, CommandStore.Factory shardFactory) + { + super(num, node, uniqueNow, agent, store, shardFactory); + } + + private <S, F, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, F f, Fold<F, ?, List<Future<T>>> fold, BiFunction<T, T, T> reduce) + { + List<Future<T>> futures = groups.foldl(select, scope, fold, f, null, ArrayList::new); + T result = null; + for (Future<T> future : futures) { - Throwable cause = e.getCause(); - throw new RuntimeException(cause != null ? cause : e); + try + { + T next = future.get(); + if (result == null) result = next; + else result = reduce.apply(result, next); + } + catch (InterruptedException e) + { + throw new UncheckedInterruptedException(e); + } + catch (ExecutionException e) + { + throw new RuntimeException(e.getCause()); + } } + return result; } @Override - public Spliterator<CommandStore> trySplit() + protected <S, T> T mapReduce(ToLongBiFunction<StoreGroup, S> select, S scope, Function<? super CommandStore, T> map, BiFunction<T, T, T> reduce) { - return null; + return mapReduce(select, scope, map, (store, f, i, t) -> { t.add(store.process(f)); return t; }, reduce); } - @Override - public long estimateSize() + protected <S> void forEach(ToLongBiFunction<StoreGroup, S> select, S scope, Consumer<? super CommandStore> forEach) { - return commandStores.length; + mapReduce(select, scope, forEach, (store, f, i, t) -> { t.add(store.process(f)); return t; }, (Void i1, Void i2) -> null); } + } - @Override - public int characteristics() + public static class Debug extends SingleThread + { + public Debug(int num, Id node, Function<Timestamp, Timestamp> uniqueNow, Agent agent, Store store) { - return Spliterator.SIZED | Spliterator.NONNULL | Spliterator.DISTINCT | Spliterator.IMMUTABLE; + super(num, node, uniqueNow, agent, store, CommandStore.Debug::new); } } + } diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java index 20fd1e0..4f53814 100644 --- a/accord-core/src/main/java/accord/local/CommandsForKey.java +++ b/accord-core/src/main/java/accord/local/CommandsForKey.java @@ -60,4 +60,9 @@ public class CommandsForKey implements Listener, Iterable<Command> { return Iterators.concat(uncommitted.values().iterator(), committedByExecuteAt.values().iterator()); } + + public boolean isEmpty() + { + return uncommitted.isEmpty() && committedById.isEmpty(); + } } diff --git a/accord-core/src/main/java/accord/local/Node.java b/accord-core/src/main/java/accord/local/Node.java index 9a37a03..e70e9f1 100644 --- a/accord-core/src/main/java/accord/local/Node.java +++ b/accord-core/src/main/java/accord/local/Node.java @@ -4,10 +4,14 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.IntFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; -import java.util.stream.Stream; + +import com.google.common.annotations.VisibleForTesting; import accord.api.*; import accord.coordinate.Coordinate; @@ -90,7 +94,7 @@ public class Node implements ConfigurationService.Listener private final Set<TxnId> pendingRecovery = Collections.newSetFromMap(new ConcurrentHashMap<>()); public Node(Id id, MessageSink messageSink, ConfigurationService configService, LongSupplier nowSupplier, - Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStore.Factory commandStoreFactory) + Supplier<Store> dataSupplier, Agent agent, Scheduler scheduler, CommandStores.Factory factory) { this.id = id; this.agent = agent; @@ -101,12 +105,7 @@ public class Node implements ConfigurationService.Listener this.now = new AtomicReference<>(new Timestamp(topology.epoch(), nowSupplier.getAsLong(), 0, id)); this.nowSupplier = nowSupplier; this.scheduler = scheduler; - this.commandStores = new CommandStores(numCommandShards(), - id, - this::uniqueNow, - agent, - dataSupplier.get(), - commandStoreFactory); + this.commandStores = factory.create(numCommandShards(), id, this::uniqueNow, agent, dataSupplier.get()); configService.registerListener(this); onTopologyUpdate(topology, false); @@ -188,31 +187,39 @@ public class Node implements ConfigurationService.Listener return nowSupplier.getAsLong(); } - public Stream<CommandStore> local(Keys keys) + public void forEachLocal(Consumer<CommandStore> forEach) { - return commandStores.forKeys(keys); + commandStores.forEach(forEach); } - public Stream<CommandStore> local(Txn txn) + public void forEachLocal(Keys keys, Consumer<CommandStore> forEach) { - return commandStores.forKeys(txn.keys()); + commandStores.forEach(keys, forEach); } - public Stream<CommandStore> local(TxnRequest.Scope scope) + public void forEachLocal(Txn txn, Consumer<CommandStore> forEach) { - return commandStores.forScope(scope); + forEachLocal(txn.keys, forEach); } - public Stream<CommandStore> local() + public void forEachLocal(TxnRequest.Scope scope, Consumer<CommandStore> forEach) { - return commandStores.stream(); + commandStores.forEach(scope, forEach); } - public Optional<CommandStore> local(Key key) + public <T> T mapReduceLocal(TxnRequest.Scope scope, Function<CommandStore, T> map, BiFunction<T, T, T> reduce) { - return local(Keys.of(key)).reduce((i1, i2) -> { - throw new IllegalStateException("more than one instance encountered for key"); - }); + return commandStores.mapReduce(scope, map, reduce); + } + + public <T extends Collection<CommandStore>> T collectLocal(Keys keys, IntFunction<T> factory) + { + return commandStores.collect(keys, factory); + } + + public <T extends Collection<CommandStore>> T collectLocal(TxnRequest.Scope scope, IntFunction<T> factory) + { + return commandStores.collect(scope, factory); } // send to every node besides ourselves @@ -284,8 +291,11 @@ public class Node implements ConfigurationService.Listener // TODO: The combination of updating the epoch of the next timestamp with epochs we don’t have topologies for, // and requiring preaccept to talk to its topology epoch means that learning of a new epoch via timestamp // (ie not via config service) will halt any new txns from a node until it receives this topology - if (txnId.epoch > configService.currentEpoch()) - return configService.fetchTopologyForEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn)); + if (txnId.epoch > topology().epoch()) + { + configService.fetchTopologyForEpoch(txnId.epoch); + return topology().awaitEpoch(txnId.epoch).flatMap(v -> coordinate(txnId, txn)); + } Future<Result> result = Coordinate.execute(this, txnId, txn); coordinating.put(txnId, result); @@ -309,9 +319,10 @@ public class Node implements ConfigurationService.Listener // TODO: encapsulate in Coordinate, so we can request that e.g. commits be re-sent? public void recover(TxnId txnId, Txn txn) { - if (txnId.epoch > configService.currentEpoch()) + if (txnId.epoch > topology.epoch()) { - configService.fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn)); + configService.fetchTopologyForEpoch(txnId.epoch); + topology().awaitEpoch(txnId.epoch).addListener(() -> recover(txnId, txn)); return; } @@ -319,12 +330,6 @@ public class Node implements ConfigurationService.Listener if (result != null) return; - if (txnId.epoch > topology().epoch()) - { - configService().fetchTopologyForEpoch(txnId.epoch).addListener(() -> recover(txnId, txn)); - return; - } - result = Coordinate.recover(this, txnId, txn); coordinating.putIfAbsent(txnId, result); // TODO (now): error handling @@ -343,7 +348,8 @@ public class Node implements ConfigurationService.Listener long unknownEpoch = topology().maxUnknownEpoch(request); if (unknownEpoch > 0) { - configService.fetchTopologyForEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext)); + configService.fetchTopologyForEpoch(unknownEpoch); + topology().awaitEpoch(unknownEpoch).addListener(() -> receive(request, from, replyContext)); return; } scheduler.now(() -> request.process(this, from, replyContext)); @@ -369,4 +375,11 @@ public class Node implements ConfigurationService.Listener { return "Node{" + id + '}'; } + + @VisibleForTesting + public CommandStore unsafeForKey(Key key) + { + return commandStores.unsafeForKey(key); + } + } diff --git a/accord-core/src/main/java/accord/messages/Accept.java b/accord-core/src/main/java/accord/messages/Accept.java index a9c581a..b9dc1c2 100644 --- a/accord-core/src/main/java/accord/messages/Accept.java +++ b/accord-core/src/main/java/accord/messages/Accept.java @@ -36,12 +36,12 @@ public class Accept extends TxnRequest public void process(Node on, Node.Id replyToNode, ReplyContext replyContext) { - on.reply(replyToNode, replyContext, on.local(scope()).map(instance -> { + on.reply(replyToNode, replyContext, on.mapReduceLocal(scope(), instance -> { Command command = instance.command(txnId); if (!command.accept(ballot, txn, executeAt, deps)) return new AcceptNack(txnId, command.promised()); return new AcceptOk(txnId, calculateDeps(instance, txnId, txn, executeAt)); - }).reduce((r1, r2) -> { + }, (r1, r2) -> { if (!r1.isOK()) return r1; if (!r2.isOK()) return r2; AcceptOk ok1 = (AcceptOk) r1; @@ -50,7 +50,7 @@ public class Accept extends TxnRequest if (ok2.deps.isEmpty()) return ok1; ok1.deps.addAll(ok2.deps); return ok1; - }).orElseThrow()); + })); } @Override diff --git a/accord-core/src/main/java/accord/messages/Apply.java b/accord-core/src/main/java/accord/messages/Apply.java index 7f296e6..a917359 100644 --- a/accord-core/src/main/java/accord/messages/Apply.java +++ b/accord-core/src/main/java/accord/messages/Apply.java @@ -38,7 +38,7 @@ public class Apply extends TxnRequest public void process(Node node, Id replyToNode, ReplyContext replyContext) { - node.local(scope()).forEach(instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result)); + node.forEachLocal(scope(), instance -> instance.command(txnId).apply(txn, deps, executeAt, writes, result)); } @Override diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java b/accord-core/src/main/java/accord/messages/BeginRecovery.java index 4d6e2e5..bf2d6b8 100644 --- a/accord-core/src/main/java/accord/messages/BeginRecovery.java +++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java @@ -43,7 +43,7 @@ public class BeginRecovery extends TxnRequest public void process(Node node, Id replyToNode, ReplyContext replyContext) { - RecoverReply reply = node.local(scope()).map(instance -> { + RecoverReply reply = node.mapReduceLocal(scope(), instance -> { Command command = instance.command(txnId); if (!command.recover(txn, ballot)) @@ -84,7 +84,7 @@ public class BeginRecovery extends TxnRequest .collect(Dependencies::new, Dependencies::add, Dependencies::addAll); } return new RecoverOk(txnId, command.status(), command.accepted(), command.executeAt(), deps, earlierCommittedWitness, earlierAcceptedNoWitness, rejectsFastPath, command.writes(), command.result()); - }).reduce((r1, r2) -> { + }, (r1, r2) -> { if (!r1.isOK()) return r1; if (!r2.isOK()) return r2; RecoverOk ok1 = (RecoverOk) r1; @@ -140,7 +140,7 @@ public class BeginRecovery extends TxnRequest ok1.earlierAcceptedNoWitness, ok1.rejectsFastPath | ok2.rejectsFastPath, ok1.writes, ok1.result); - }).orElseThrow(); + }); node.reply(replyToNode, replyContext, reply); if (reply instanceof RecoverOk && ((RecoverOk) reply).status == Applied) @@ -148,9 +148,10 @@ public class BeginRecovery extends TxnRequest // disseminate directly RecoverOk ok = (RecoverOk) reply; ConfigurationService configService = node.configService(); - if (ok.executeAt.epoch > configService.currentEpoch()) + if (ok.executeAt.epoch > node.topology().epoch()) { - configService.fetchTopologyForEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok)); + configService.fetchTopologyForEpoch(ok.executeAt.epoch); + node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok)); return; } disseminateApply(node, ok); @@ -160,6 +161,12 @@ public class BeginRecovery extends TxnRequest private void disseminateApply(Node node, RecoverOk ok) { Preconditions.checkArgument(ok.status == Applied); + if (ok.executeAt.epoch > node.epoch()) + { + node.configService().fetchTopologyForEpoch(ok.executeAt.epoch); + node.topology().awaitEpoch(ok.executeAt.epoch).addListener(() -> disseminateApply(node, ok)); + return; + } Topologies topologies = node.topology().forKeys(txn.keys, ok.executeAt.epoch); node.send(topologies.nodes(), to -> new Apply(to, topologies, txnId, txn, ok.executeAt, ok.deps, ok.writes, ok.result)); } diff --git a/accord-core/src/main/java/accord/messages/Commit.java b/accord-core/src/main/java/accord/messages/Commit.java index 57d13e2..8bf512d 100644 --- a/accord-core/src/main/java/accord/messages/Commit.java +++ b/accord-core/src/main/java/accord/messages/Commit.java @@ -28,7 +28,7 @@ public class Commit extends ReadData public void process(Node node, Id from, ReplyContext replyContext) { - node.local(scope()).forEach(instance -> instance.command(txnId).commit(txn, deps, executeAt)); + node.forEachLocal(scope(), instance -> instance.command(txnId).commit(txn, deps, executeAt)); if (read) super.process(node, from, replyContext); } diff --git a/accord-core/src/main/java/accord/messages/PreAccept.java b/accord-core/src/main/java/accord/messages/PreAccept.java index 0dcfe4b..cee9cff 100644 --- a/accord-core/src/main/java/accord/messages/PreAccept.java +++ b/accord-core/src/main/java/accord/messages/PreAccept.java @@ -33,7 +33,7 @@ public class PreAccept extends TxnRequest public void process(Node node, Id from, ReplyContext replyContext) { - node.reply(from, replyContext, node.local(scope()).map(instance -> { + node.reply(from, replyContext, node.mapReduceLocal(scope(), instance -> { // note: this diverges from the paper, in that instead of waiting for JoinShard, // we PreAccept to both old and new topologies and require quorums in both. // This necessitates sending to ALL replicas of old topology, not only electorate (as fast path may be unreachable). @@ -41,7 +41,7 @@ public class PreAccept extends TxnRequest if (!command.witness(txn)) return PreAcceptNack.INSTANCE; return new PreAcceptOk(txnId, command.executeAt(), calculateDeps(instance, txnId, txn, txnId)); - }).reduce((r1, r2) -> { + }, (r1, r2) -> { if (!r1.isOK()) return r1; if (!r2.isOK()) return r2; PreAcceptOk ok1 = (PreAcceptOk) r1; @@ -50,7 +50,7 @@ public class PreAccept extends TxnRequest if (ok1 != okMax && !ok1.deps.isEmpty()) okMax.deps.addAll(ok1.deps); if (ok2 != okMax && !ok2.deps.isEmpty()) okMax.deps.addAll(ok2.deps); return okMax; - }).orElseThrow()); + })); } @Override diff --git a/accord-core/src/main/java/accord/messages/ReadData.java b/accord-core/src/main/java/accord/messages/ReadData.java index e35d4bc..6c08b97 100644 --- a/accord-core/src/main/java/accord/messages/ReadData.java +++ b/accord-core/src/main/java/accord/messages/ReadData.java @@ -124,7 +124,7 @@ public class ReadData extends TxnRequest synchronized void setup(TxnId txnId, Txn txn, Scope scope) { // TODO: simple hash set supporting concurrent modification, or else avoid concurrent modification - waitingOn = node.local(scope).collect(Collectors.toCollection(() -> new DeterministicIdentitySet<>())); + waitingOn = node.collectLocal(scope, DeterministicIdentitySet::new); // FIXME: fix/check thread safety CommandStore.onEach(waitingOn, instance -> { Command command = instance.command(txnId); diff --git a/accord-core/src/main/java/accord/messages/TxnRequest.java b/accord-core/src/main/java/accord/messages/TxnRequest.java index 606825b..a41b56e 100644 --- a/accord-core/src/main/java/accord/messages/TxnRequest.java +++ b/accord-core/src/main/java/accord/messages/TxnRequest.java @@ -7,9 +7,6 @@ import accord.topology.Topology; import accord.txn.Keys; import accord.txn.Txn; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Objects; public abstract class TxnRequest implements Request @@ -32,85 +29,37 @@ public abstract class TxnRequest implements Request */ public static class Scope { - public static class KeysForEpoch - { - public final long epoch; - public final Keys keys; - - public KeysForEpoch(long epoch, Keys keys) - { - this.epoch = epoch; - this.keys = keys; - } - - static KeysForEpoch forTopology(Topology topology, Node.Id node, Keys keys) - { - KeyRanges topologyRanges = topology.rangesForNode(node); - if (topologyRanges == null) - return null; - topologyRanges = topologyRanges.intersection(keys); - Keys scopeKeys = keys.intersection(topologyRanges); - return !topologyRanges.isEmpty() ? new KeysForEpoch(topology.epoch(), scopeKeys) : null; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - KeysForEpoch that = (KeysForEpoch) o; - return epoch == that.epoch && keys.equals(that.keys); - } - - @Override - public int hashCode() - { - return Objects.hash(epoch, keys); - } - - @Override - public String toString() - { - return "EpochRanges{" + - "epoch=" + epoch + - ", keys=" + keys + - '}'; - } - } - - private final long maxEpoch; - private final KeysForEpoch[] epochs; - - public Scope(long maxEpoch, KeysForEpoch... epochKeys) - { - this.maxEpoch = maxEpoch; - this.epochs = epochKeys; - } - - public int size() - { - return epochs.length; - } + private final long minRequiredEpoch; + private final Keys keys; - public KeysForEpoch get(int i) + public Scope(long minRequiredEpoch, Keys keys) { - return epochs[i]; + this.minRequiredEpoch = minRequiredEpoch; + this.keys = keys; } - public static Scope forTopologies(Node.Id node, Topologies topologies, Keys keys) + public static Scope forTopologies(Node.Id node, Topologies topologies, Keys txnKeys) { - List<KeysForEpoch> ranges = new ArrayList<>(topologies.size()); + long minEpoch = 0; + Keys scopeKeys = Keys.EMPTY; + Keys lastKeys = null; for (int i=topologies.size() - 1; i>=0; i--) { Topology topology = topologies.get(i); - KeysForEpoch keysForEpoch = KeysForEpoch.forTopology(topology, node, keys); - if (keysForEpoch != null) + KeyRanges topologyRanges = topology.rangesForNode(node); + if (topologyRanges == null) + continue; + topologyRanges = topologyRanges.intersection(txnKeys); + Keys epochKeys = txnKeys.intersection(topologyRanges); + if (lastKeys == null || !lastKeys.containsAll(epochKeys)) { - ranges.add(keysForEpoch); + minEpoch = topology.epoch(); + scopeKeys = scopeKeys.merge(epochKeys); } + lastKeys = epochKeys; } - return new Scope(topologies.currentEpoch(), ranges.toArray(KeysForEpoch[]::new)); + return new Scope(minEpoch, scopeKeys); } public static Scope forTopologies(Node.Id node, Topologies topologies, Txn txn) @@ -118,27 +67,13 @@ public abstract class TxnRequest implements Request return forTopologies(node, topologies, txn.keys()); } - public long maxEpoch() - { - return maxEpoch; - } - - public boolean intersects(KeyRanges ranges) + public long minRequiredEpoch() { - for (KeysForEpoch keysForEpoch : this.epochs) - { - if (ranges.intersects(keysForEpoch.keys)) - return true; - } - - return false; + return minRequiredEpoch; } public Keys keys() { - Keys keys = epochs[0].keys; - for (int i = 1; i< epochs.length; i++) - keys = keys.merge(epochs[i].keys); return keys; } @@ -147,23 +82,22 @@ public abstract class TxnRequest implements Request { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - Scope that = (Scope) o; - return maxEpoch == that.maxEpoch && Arrays.equals(epochs, that.epochs); + Scope scope = (Scope) o; + return minRequiredEpoch == scope.minRequiredEpoch && keys.equals(scope.keys); } @Override public int hashCode() { - int result = Objects.hash(maxEpoch); - result = 31 * result + Arrays.hashCode(epochs); - return result; + return Objects.hash(minRequiredEpoch, keys); } @Override public String toString() { - return "TxnRequestScope{" + - "epochs=" + Arrays.toString(epochs) + + return "Scope{" + + "maxEpoch=" + minRequiredEpoch + + ", keys=" + keys + '}'; } } diff --git a/accord-core/src/main/java/accord/messages/WaitOnCommit.java b/accord-core/src/main/java/accord/messages/WaitOnCommit.java index a09324f..63aafb9 100644 --- a/accord-core/src/main/java/accord/messages/WaitOnCommit.java +++ b/accord-core/src/main/java/accord/messages/WaitOnCommit.java @@ -1,8 +1,8 @@ package accord.messages; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import accord.local.*; import accord.local.Node.Id; @@ -57,7 +57,7 @@ public class WaitOnCommit extends TxnRequest node.reply(replyToNode, replyContext, WaitOnCommitOk.INSTANCE); } - void process(CommandStore instance) + void setup(CommandStore instance) { Command command = instance.command(txnId); switch (command.status()) @@ -78,9 +78,9 @@ public class WaitOnCommit extends TxnRequest synchronized void setup(Keys keys) { - List<CommandStore> instances = node.local(keys).collect(Collectors.toList()); + List<CommandStore> instances = node.collectLocal(keys, ArrayList::new); waitingOn.set(instances.size()); - instances.forEach(instance -> instance.processBlocking(this::process)); + instances.forEach(instance -> instance.processBlocking(this::setup)); } } diff --git a/accord-core/src/main/java/accord/topology/KeyRanges.java b/accord-core/src/main/java/accord/topology/KeyRanges.java index 427a5e1..de2ce60 100644 --- a/accord-core/src/main/java/accord/topology/KeyRanges.java +++ b/accord-core/src/main/java/accord/topology/KeyRanges.java @@ -64,6 +64,11 @@ public class KeyRanges implements Iterable<KeyRange> return rangeIndexForKey(0, ranges.length, key); } + public boolean contains(Key key) + { + return rangeIndexForKey(key) >= 0; + } + public int size() { return ranges.length; diff --git a/accord-core/src/main/java/accord/topology/Topology.java b/accord-core/src/main/java/accord/topology/Topology.java index 8c88d32..dd25071 100644 --- a/accord-core/src/main/java/accord/topology/Topology.java +++ b/accord-core/src/main/java/accord/topology/Topology.java @@ -118,11 +118,6 @@ public class Topology extends AbstractCollection<Shard> return supersetRangeIndexes.length < shards.length; } - public boolean isSubsetOf(Topology topology) - { - return epoch() == topology.epoch() && Arrays.equals(this.shards, topology.shards); - } - public Topology withEpoch(long epoch) { return new Topology(epoch, shards, ranges, nodeLookup, subsetOfRanges, supersetRangeIndexes); @@ -192,7 +187,7 @@ public class Topology extends AbstractCollection<Shard> return forKeys(select, (i, shard) -> true); } - public <T> T accumulateForKeys(Keys select, IndexedBiFunction<Shard, T, T> function, T start) + public <T> T foldl(Keys select, IndexedBiFunction<Shard, T, T> function, T accumulator) { int subsetIndex = 0; for (int i = 0 ; i < select.size() ; ) @@ -203,11 +198,11 @@ public class Topology extends AbstractCollection<Shard> throw new IllegalArgumentException("Range not found for " + select.get(i)); int supersetIndex = supersetRangeIndexes[subsetIndex]; Shard shard = shards[supersetIndex]; - start = function.apply(subsetIndex, shard, start); + accumulator = function.apply(subsetIndex, shard, accumulator); // find the first key outside this range i = shard.range.higherKeyIndex(select, i, select.size()); } - return start; + return accumulator; } /** diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java b/accord-core/src/main/java/accord/topology/TopologyManager.java index cd3ed84..d3e9a92 100644 --- a/accord-core/src/main/java/accord/topology/TopologyManager.java +++ b/accord-core/src/main/java/accord/topology/TopologyManager.java @@ -9,6 +9,9 @@ import accord.txn.Keys; import accord.txn.Txn; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.apache.cassandra.utils.concurrent.AsyncPromise; +import org.apache.cassandra.utils.concurrent.Future; +import org.apache.cassandra.utils.concurrent.ImmediateFuture; import java.util.*; import java.util.function.LongConsumer; @@ -25,13 +28,14 @@ import java.util.function.LongConsumer; */ public class TopologyManager implements ConfigurationService.Listener { + private static final Future<Void> SUCCESS = ImmediateFuture.success(null); class EpochState { private final Topology global; private final Topology local; private final QuorumTracker syncTracker; private boolean syncComplete = false; - private boolean prevSynced = false; + private boolean prevSynced; EpochState(Topology global, boolean prevSynced) { @@ -39,8 +43,7 @@ public class TopologyManager implements ConfigurationService.Listener this.global = global; this.local = global.forNode(node); this.syncTracker = new QuorumTracker(new Topologies.Singleton(global, false)); - if (prevSynced) - markPrevSynced(); + this.prevSynced = prevSynced; } void markPrevSynced() @@ -73,7 +76,7 @@ public class TopologyManager implements ConfigurationService.Listener return false; if (syncComplete) return true; - Boolean result = global.accumulateForKeys(keys, (i, shard, acc) -> { + Boolean result = global.foldl(keys, (i, shard, acc) -> { if (acc == Boolean.FALSE) return acc; return Boolean.valueOf(syncTracker.unsafeGet(i).hasReachedQuorum()); @@ -97,10 +100,16 @@ public class TopologyManager implements ConfigurationService.Listener // until the superseding epoch has been applied private final List<Set<Node.Id>> pendingSyncComplete; - private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete) + // list of promises to be completed as newer epochs become active. This is to support processes that + // are waiting on future epochs to begin (ie: txn requests from futures epochs). Index 0 is for + // currentEpoch + 1 + private final List<AsyncPromise<Void>> futureEpochFutures; + + private Epochs(EpochState[] epochs, List<Set<Node.Id>> pendingSyncComplete, List<AsyncPromise<Void>> futureEpochFutures) { this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0; this.pendingSyncComplete = pendingSyncComplete; + this.futureEpochFutures = futureEpochFutures; for (int i=1; i<epochs.length; i++) Preconditions.checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1); this.epochs = epochs; @@ -108,7 +117,19 @@ public class TopologyManager implements ConfigurationService.Listener private Epochs(EpochState[] epochs) { - this(epochs, new ArrayList<>()); + this(epochs, new ArrayList<>(), new ArrayList<>()); + } + + public Future<Void> awaitEpoch(long epoch) + { + if (epoch <= currentEpoch) + return SUCCESS; + + int diff = (int) (epoch - currentEpoch); + while (futureEpochFutures.size() < diff) + futureEpochFutures.add(new AsyncPromise<>()); + + return futureEpochFutures.get(diff - 1); } public long nextEpoch() @@ -121,25 +142,6 @@ public class TopologyManager implements ConfigurationService.Listener return epochs.length > 0 ? epochs[0].global : Topology.EMPTY; } - public Epochs add(Topology topology) - { - Preconditions.checkArgument(topology.epoch == nextEpoch()); - EpochState[] nextEpochs = new EpochState[epochs.length + 1]; - List<Set<Node.Id>> pendingSync = new ArrayList<>(pendingSyncComplete); - if (!pendingSync.isEmpty()) - { - EpochState current = epochs[0]; - if (epochs.length <= 1 || epochs[1].syncComplete()) - current.markPrevSynced(); - pendingSync.remove(0).forEach(current::recordSyncComplete); - } - System.arraycopy(epochs, 0, nextEpochs, 1, epochs.length); - - boolean prevSynced = epochs.length == 0 || epochs[0].syncComplete(); - nextEpochs[0] = new EpochState(topology, prevSynced); - return new Epochs(nextEpochs, pendingSync); - } - /** * Mark sync complete for the given node/epoch, and if this epoch * is now synced, update the prevSynced flag on superseding epochs @@ -177,35 +179,8 @@ public class TopologyManager implements ConfigurationService.Listener long maxUnknownEpoch(TxnRequest.Scope scope) { - EpochState lastState = null; - for (int i=0, mi=scope.size(); i<mi; i++) - { - TxnRequest.Scope.KeysForEpoch requestRanges = scope.get(i); - EpochState epochState = get(requestRanges.epoch); - - if (epochState != null) - { - lastState = epochState; - } - else if (lastState != null && lastState.local.ranges().intersects(requestRanges.keys)) - { - // we don't have the most recent epoch, but still replicate the requested ranges - continue; - } - else - { - // we don't have the most recent epoch, and we don't replicate the requested ranges - return scope.maxEpoch(); - } - - // validate requested ranges - KeyRanges localRanges = epochState.local.ranges(); - if (!localRanges.intersects(requestRanges.keys)) - throw new RuntimeException("Received request for ranges not replicated by this node"); - } - if (scope.maxEpoch() > 0) - epochReporter.accept(scope.maxEpoch()); - + if (currentEpoch < scope.minRequiredEpoch()) + return scope.minRequiredEpoch(); return 0; } @@ -229,7 +204,33 @@ public class TopologyManager implements ConfigurationService.Listener @Override public synchronized void onTopologyUpdate(Topology topology) { - epochs = epochs.add(topology); + Epochs current = epochs; + + Preconditions.checkArgument(topology.epoch == current.nextEpoch()); + EpochState[] nextEpochs = new EpochState[current.epochs.length + 1]; + List<Set<Node.Id>> pendingSync = new ArrayList<>(current.pendingSyncComplete); + if (!pendingSync.isEmpty()) + { + EpochState currentEpoch = current.epochs[0]; + if (current.epochs.length <= 1 || current.epochs[1].syncComplete()) + currentEpoch.markPrevSynced(); + pendingSync.remove(0).forEach(currentEpoch::recordSyncComplete); + } + System.arraycopy(current.epochs, 0, nextEpochs, 1, current.epochs.length); + + boolean prevSynced = current.epochs.length == 0 || current.epochs[0].syncComplete(); + nextEpochs[0] = new EpochState(topology, prevSynced); + + List<AsyncPromise<Void>> futureEpochFutures = new ArrayList<>(current.futureEpochFutures); + AsyncPromise<Void> toComplete = !futureEpochFutures.isEmpty() ? futureEpochFutures.remove(0) : null; + epochs = new Epochs(nextEpochs, pendingSync, futureEpochFutures); + if (toComplete != null) + toComplete.trySuccess(null); + } + + public synchronized Future<Void> awaitEpoch(long epoch) + { + return epochs.awaitEpoch(epoch); } @Override diff --git a/accord-core/src/main/java/accord/txn/Keys.java b/accord-core/src/main/java/accord/txn/Keys.java index cf6388b..f5bfc97 100644 --- a/accord-core/src/main/java/accord/txn/Keys.java +++ b/accord-core/src/main/java/accord/txn/Keys.java @@ -8,7 +8,6 @@ import java.util.stream.Stream; import accord.api.Key; import accord.api.KeyRange; import accord.topology.KeyRanges; -import com.google.common.base.Preconditions; @SuppressWarnings("rawtypes") public class Keys implements Iterable<Key> @@ -67,6 +66,42 @@ public class Keys implements Iterable<Key> return new Keys(selection); } + /** + * return true if this keys collection contains all keys found in the given keys + */ + public boolean containsAll(Keys that) + { + if (isEmpty()) + return that.isEmpty(); + + for (int thisIdx=0, thatIdx=0, thisSize=size(), thatSize=that.size(); + thatIdx<thatSize; + thisIdx++, thatIdx++) + { + if (thisIdx >= thisSize) + return false; + + Key thatKey = that.keys[thatIdx]; + Key thisKey = this.keys[thisIdx]; + int cmp = thisKey.compareTo(thatKey); + + if (cmp == 0) + continue; + + // if this key is greater that that key, we can't contain that key + if (cmp > 0) + return false; + + // if search returns a positive index, a match was found and + // no further comparison is needed + thisIdx = Arrays.binarySearch(keys, thisIdx, thisSize, thatKey); + if (thisIdx < 0) + return false; + } + + return true; + } + public Keys merge(Keys that) { int thisIdx = 0; @@ -225,20 +260,16 @@ public class Keys implements Iterable<Key> return result != null ? new Keys(result) : EMPTY; } - public interface KeyAccumulator<V> + public interface KeyFold<V> { - V accumulate(Key key, V value); - default boolean isDone() - { - return false; - } + V fold(Key key, V value); } /** * Count the number of keys matching the predicate and intersecting with the given ranges. * If terminateAfter is greater than 0, the method will return once terminateAfter matches are encountered */ - public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator, V value) + public <V> V foldl(KeyRanges ranges, KeyFold<V> fold, V accumulator) { int keyLB = 0; int keyHB = size(); @@ -249,7 +280,7 @@ public class Keys implements Iterable<Key> for (;rangeLB<rangeHB && keyLB<keyHB;) { Key key = keys[keyLB]; - rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key); + rangeLB = ranges.rangeIndexForKey(rangeLB, rangeHB, key); if (rangeLB < 0) { @@ -264,11 +295,7 @@ public class Keys implements Iterable<Key> int highKey = range.higherKeyIndex(this, keyLB, keyHB); for (int i=keyLB; i<highKey; i++) - { - value = accumulator.accumulate(keys[i], value); - if (accumulator.isDone()) - return value; - } + accumulator = fold.fold(keys[i], accumulator); keyLB = highKey; rangeLB++; @@ -278,43 +305,59 @@ public class Keys implements Iterable<Key> keyLB = -1 - keyLB; } - return value; + return accumulator; } - public <V> V accumulate(KeyRanges ranges, KeyAccumulator<V> accumulator) + public boolean any(KeyRanges ranges, Predicate<Key> predicate) { - return accumulate(ranges, accumulator, null); + return 1 == foldl(ranges, (key, i1, i2) -> predicate.test(key) ? 1 : 0, 0, 0, 1); } - private static class TerminatingKeyAccumulator<V> implements KeyAccumulator<V> + public interface FoldKeysToLong { - private boolean isDone = false; - private final Predicate<Key> predicate; + long apply(Key key, long param, long prev); + } - public TerminatingKeyAccumulator(Predicate<Key> predicate) - { - this.predicate = predicate; - } + public long foldl(KeyRanges ranges, FoldKeysToLong fold, long param, long initialValue, long terminalValue) + { + int keyLB = 0; + int keyHB = size(); + int rangeLB = 0; + int rangeHB = ranges.rangeIndexForKey(keys[keyHB-1]); + rangeHB = rangeHB < 0 ? -1 - rangeHB : rangeHB + 1; - @Override - final public V accumulate(Key key, V value) + for (;rangeLB<rangeHB && keyLB<keyHB;) { - Preconditions.checkState(!isDone); - isDone = predicate.test(key); - return value; - } + Key key = keys[keyLB]; + rangeLB = ranges.rangeIndexForKey(rangeLB, ranges.size(), key); - @Override - final public boolean isDone() - { - return isDone; + if (rangeLB < 0) + { + rangeLB = -1 -rangeLB; + if (rangeLB >= rangeHB) + break; + keyLB = ranges.get(rangeLB).lowKeyIndex(this, keyLB, keyHB); + } + else + { + KeyRange<?> range = ranges.get(rangeLB); + int highKey = range.higherKeyIndex(this, keyLB, keyHB); + + for (int i=keyLB; i<highKey; i++) + { + initialValue = fold.apply(keys[i], param, initialValue); + if (terminalValue == initialValue) + return initialValue; + } + + keyLB = highKey; + rangeLB++; + } + + if (keyLB < 0) + keyLB = -1 - keyLB; } - } - public boolean any(KeyRanges ranges, Predicate<Key> predicate) - { - TerminatingKeyAccumulator<Void> accumulator = new TerminatingKeyAccumulator<>(predicate); - accumulate(ranges, accumulator, null); - return accumulator.isDone(); + return initialValue; } } diff --git a/accord-core/src/main/java/accord/txn/Txn.java b/accord-core/src/main/java/accord/txn/Txn.java index 9466a1c..ae2619d 100644 --- a/accord-core/src/main/java/accord/txn/Txn.java +++ b/accord-core/src/main/java/accord/txn/Txn.java @@ -6,7 +6,6 @@ import java.util.stream.Stream; import accord.api.*; import accord.local.*; -import accord.topology.KeyRanges; public class Txn { @@ -90,14 +89,14 @@ public class Txn public Data read(Command command, Keys keyScope) { - return keyScope.accumulate(command.commandStore.ranges(), (key, accumulate) -> { + return keyScope.foldl(command.commandStore.ranges(), (key, accumulate) -> { CommandStore commandStore = command.commandStore; if (!commandStore.hashIntersects(key)) return accumulate; Data result = read.read(key, command.executeAt(), commandStore.store()); return accumulate != null ? accumulate.merge(result) : result; - }); + }, null); } public Timestamp maxConflict(CommandStore commandStore) diff --git a/accord-core/src/main/java/accord/txn/Writes.java b/accord-core/src/main/java/accord/txn/Writes.java index 019ed8b..67bc51c 100644 --- a/accord-core/src/main/java/accord/txn/Writes.java +++ b/accord-core/src/main/java/accord/txn/Writes.java @@ -21,11 +21,11 @@ public class Writes if (write == null) return; - keys.accumulate(commandStore.ranges(), (key, accumulate) -> { + keys.foldl(commandStore.ranges(), (key, accumulate) -> { if (commandStore.hashIntersects(key)) write.apply(key, executeAt, commandStore.store()); return accumulate; - }); + }, null); } @Override diff --git a/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java b/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java index b05b62a..90ba39d 100644 --- a/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java +++ b/accord-core/src/main/java/accord/utils/DeterministicIdentitySet.java @@ -21,12 +21,18 @@ public class DeterministicIdentitySet<T> extends AbstractSet<T> } // TODO: an identity hash map that doesn't mind concurrent modification / iteration - final IdentityHashMap<T, Entry<T>> lookup = new IdentityHashMap<>(); + final IdentityHashMap<T, Entry<T>> lookup; final Entry<T> head = new Entry<T>(null); public DeterministicIdentitySet() + { + this(0); + } + + public DeterministicIdentitySet(int size) { head.prev = head.next = head; + lookup = new IdentityHashMap<>(size); } @Override diff --git a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java index 1ba217f..30b5cbb 100644 --- a/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java +++ b/accord-core/src/main/java/accord/utils/ThreadPoolScheduler.java @@ -5,6 +5,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import accord.api.Scheduler; +import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +78,7 @@ public class ThreadPoolScheduler implements Scheduler } catch (InterruptedException e) { - throw new IllegalStateException(e); + throw new UncheckedInterruptedException(e); } } } diff --git a/accord-core/src/test/java/accord/KeysTest.java b/accord-core/src/test/java/accord/KeysTest.java index 03f8bfd..1f06656 100644 --- a/accord-core/src/test/java/accord/KeysTest.java +++ b/accord-core/src/test/java/accord/KeysTest.java @@ -1,8 +1,15 @@ package accord; +import java.util.ArrayList; +import java.util.List; + +import accord.api.Key; import accord.api.KeyRange; import accord.impl.IntKey; import accord.topology.KeyRanges; +import accord.txn.Keys; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import static accord.impl.IntKey.keys; @@ -19,6 +26,7 @@ public class KeysTest { return new KeyRanges(ranges); } + @Test void intersectionTest() { @@ -43,4 +51,48 @@ public class KeysTest assertEquals(keys(0, 1, 2, 3, 4), keys(0, 2, 4).merge(keys(1, 3))); } + + @Test + void foldlTest() + { + List<Key> keys = new ArrayList<>(); + long result = keys(150, 250, 350, 450, 550).foldl(ranges(r(200, 400)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + assertEquals(16, result); + assertEquals(keys(250, 350), new Keys(keys)); + + keys.clear(); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 500)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + assertEquals(3616, result); + assertEquals(keys(150, 250, 350, 450), new Keys(keys)); + + keys.clear(); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(500, 1000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + assertEquals(1, result); + assertEquals(keys(550), new Keys(keys)); + + keys.clear(); + result = keys(150, 250, 350, 450, 550).foldl(ranges(r(0, 20), r(100, 140), r(149, 151), r(560, 2000)), (key, p, v) -> { keys.add(key); return v * p + 1; }, 15, 0, -1); + assertEquals(1, result); + assertEquals(keys(150), new Keys(keys)); + } + + @Test + void containsAll() + { + Keys keys = keys(150, 200, 250, 300, 350); + Assertions.assertTrue(keys.containsAll(keys(150, 200))); + Assertions.assertTrue(keys.containsAll(keys(150, 250))); + Assertions.assertTrue(keys.containsAll(keys(200, 250))); + Assertions.assertTrue(keys.containsAll(keys(200, 300))); + Assertions.assertTrue(keys.containsAll(keys(250, 300))); + Assertions.assertTrue(keys.containsAll(keys(250, 350))); + + Assertions.assertFalse(keys.containsAll(keys(100, 150))); + Assertions.assertFalse(keys.containsAll(keys(100, 250))); + Assertions.assertFalse(keys.containsAll(keys(200, 225))); + Assertions.assertFalse(keys.containsAll(keys(225, 300))); + Assertions.assertFalse(keys.containsAll(keys(250, 235))); + Assertions.assertFalse(keys.containsAll(keys(250, 400))); + + } } diff --git a/accord-core/src/test/java/accord/burn/BurnTest.java b/accord-core/src/test/java/accord/burn/BurnTest.java index a516bb2..0501417 100644 --- a/accord-core/src/test/java/accord/burn/BurnTest.java +++ b/accord-core/src/test/java/accord/burn/BurnTest.java @@ -247,7 +247,7 @@ public class BurnTest } catch (Throwable t) { - logger.error("Exception running burn test:", t); + logger.error("Exception running burn test for seed {}:", seed, t); throw t; } } while (overrideSeed == null); diff --git a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java index e7215f2..dae12e9 100644 --- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java +++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java @@ -6,9 +6,9 @@ import accord.api.TestableConfigurationService; import accord.local.Node; import accord.messages.*; import accord.topology.Topology; +import com.google.common.base.Preconditions; import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.Future; -import org.apache.cassandra.utils.concurrent.ImmediateFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,27 +16,104 @@ import java.util.*; import java.util.function.Function; import java.util.function.Supplier; -// TODO: merge with MockConfigurationService? public class BurnTestConfigurationService implements TestableConfigurationService { private static final Logger logger = LoggerFactory.getLogger(BurnTestConfigurationService.class); - private static final Future<Void> SUCCESS = ImmediateFuture.success(null); private final Node.Id node; private final MessageSink messageSink; private final Function<Node.Id, Node> lookup; private final Supplier<Random> randomSupplier; - private final List<Topology> epochs = new ArrayList<>(); + private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>(); + + private final EpochHistory epochs = new EpochHistory(); private final List<ConfigurationService.Listener> listeners = new ArrayList<>(); + private static class EpochState + { + private final long epoch; + private final AsyncPromise<Topology> received = new AsyncPromise<>(); + private final AsyncPromise<Void> acknowledged = new AsyncPromise<>(); + private final AsyncPromise<Void> synced = new AsyncPromise<>(); + + private Topology topology = null; + + public EpochState(long epoch) + { + this.epoch = epoch; + } + } + + private static class EpochHistory + { + // TODO: move pendingEpochs / FetchTopology into here? + private final List<EpochState> epochs = new ArrayList<>(); + + private long lastReceived = 0; + private long lastAcknowledged = 0; + private long lastSyncd = 0; + + private EpochState get(long epoch) + { + for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; addEpoch++) + epochs.add(new EpochState(addEpoch)); + return epochs.get((int) epoch); + } + + EpochHistory receive(Topology topology) + { + long epoch = topology.epoch(); + Preconditions.checkState(epoch == 0 || lastReceived == epoch - 1); + lastReceived = epoch; + EpochState state = get(epoch); + state.topology = topology; + state.received.setSuccess(topology); + return this; + } + + Future<Topology> receiveFuture(long epoch) + { + return get(epoch).received; + } + + Topology topologyFor(long epoch) + { + return get(epoch).topology; + } + + EpochHistory acknowledge(long epoch) + { + Preconditions.checkState(epoch == 0 || lastAcknowledged == epoch - 1); + lastAcknowledged = epoch; + get(epoch).acknowledged.setSuccess(null); + return this; + } + + Future<Void> acknowledgeFuture(long epoch) + { + return get(epoch).acknowledged; + } + + EpochHistory syncComplete(long epoch) + { + Preconditions.checkState(epoch == 0 || lastSyncd == epoch - 1); + EpochState state = get(epoch); + Preconditions.checkState(state.received.isDone()); + Preconditions.checkState(state.acknowledged.isDone()); + lastSyncd = epoch; + get(epoch).synced.setSuccess(null); + return this; + } + } + public BurnTestConfigurationService(Node.Id node, MessageSink messageSink, Supplier<Random> randomSupplier, Topology topology, Function<Node.Id, Node> lookup) { this.node = node; this.messageSink = messageSink; this.randomSupplier = randomSupplier; this.lookup = lookup; - epochs.add(Topology.EMPTY); - epochs.add(topology); + epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0); + epochs.receive(topology).acknowledge(1).syncComplete(1); } @Override @@ -48,13 +125,13 @@ public class BurnTestConfigurationService implements TestableConfigurationServic @Override public synchronized Topology currentTopology() { - return epochs.get(epochs.size() - 1); + return epochs.topologyFor(epochs.lastReceived); } @Override public synchronized Topology getTopologyForEpoch(long epoch) { - return epoch >= epochs.size() ? null : epochs.get((int) epoch); + return epochs.topologyFor(epoch); } private static class FetchTopologyRequest implements Request @@ -114,8 +191,6 @@ public class BurnTestConfigurationService implements TestableConfigurationServic private final FetchTopologyRequest request; private final List<Node.Id> candidates; - private final Set<Runnable> onComplete = new HashSet<>(); - public FetchTopology(long epoch) { this.request = new FetchTopologyRequest(epoch); @@ -151,23 +226,19 @@ public class BurnTestConfigurationService implements TestableConfigurationServic } } - private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>(); - @Override - public synchronized Future<Void> fetchTopologyForEpoch(long epoch) + public synchronized void fetchTopologyForEpoch(long epoch) { - if (epoch < epochs.size()) - { - return SUCCESS; - } + if (epoch <= epochs.lastReceived) + return; - FetchTopology fetch = pendingEpochs.computeIfAbsent(epoch, FetchTopology::new); - return fetch; + pendingEpochs.computeIfAbsent(epoch, FetchTopology::new); } @Override - public void acknowledgeEpoch(long epoch) + public synchronized void acknowledgeEpoch(long epoch) { + epochs.acknowledge(epoch); Topology topology = getTopologyForEpoch(epoch); Node originator = lookup.apply(node); TopologyUpdate.syncEpoch(originator, epoch - 1, topology.nodes()); @@ -176,18 +247,26 @@ public class BurnTestConfigurationService implements TestableConfigurationServic @Override public synchronized void reportTopology(Topology topology) { - if (topology.epoch() < epochs.size()) + long lastReceived = epochs.lastReceived; + if (topology.epoch() <= lastReceived) return; - if (topology.epoch() > epochs.size()) + if (topology.epoch() > lastReceived + 1) { - fetchTopologyForEpoch(epochs.size() + 1).addListener(() -> reportTopology(topology)); + fetchTopologyForEpoch(lastReceived + 1); + epochs.receiveFuture(lastReceived + 1).addListener(() -> reportTopology(topology)); return; } - logger.trace("Epoch {} received by {}", topology.epoch(), node); - epochs.add(topology); + long lastAcked = epochs.lastAcknowledged; + if (topology.epoch() > lastAcked + 1) + { + epochs.acknowledgeFuture(lastAcked + 1).addListener(() -> reportTopology(topology)); + return; + } + logger.trace("Epoch {} received by {}", topology.epoch(), node); + epochs.receive(topology); for (Listener listener : listeners) listener.onTopologyUpdate(topology); @@ -197,5 +276,4 @@ public class BurnTestConfigurationService implements TestableConfigurationServic fetch.setSuccess(null); } - } diff --git a/accord-core/src/test/java/accord/burn/TopologyUpdate.java b/accord-core/src/test/java/accord/burn/TopologyUpdate.java index 474c522..ae22d60 100644 --- a/accord-core/src/test/java/accord/burn/TopologyUpdate.java +++ b/accord-core/src/test/java/accord/burn/TopologyUpdate.java @@ -31,28 +31,7 @@ public class TopologyUpdate { private static final Logger logger = LoggerFactory.getLogger(TopologyUpdate.class); - private static class CountDownFuture<T> extends AsyncPromise<T> - { - private final AtomicInteger remaining; - private final T value; - - public CountDownFuture(int count, T value) - { - this.remaining = new AtomicInteger(count); - this.value = value; - } - - public CountDownFuture(int count) - { - this(count, null); - } - - public void countDown() - { - if (remaining.decrementAndGet() == 0) - setSuccess(value); - } - } + private static final Set<Long> pendingTopologies = Sets.newConcurrentHashSet(); private static class CommandSync { @@ -78,7 +57,7 @@ public class TopologyUpdate } public void process(Node node) { - node.local(txn.keys()).forEach(commandStore -> { + node.forEachLocal(txn, commandStore -> { switch (status) { case PreAccepted: @@ -118,18 +97,9 @@ public class TopologyUpdate return stage.addCallback(dieOnException()); } - private static <T> Future<Void> map(Collection<T> items, Function<T, Future<Void>> function) - { - CountDownFuture<Void> latch = new CountDownFuture<>(items.size()); - for (T item : items) - { - function.apply(item).addListener(latch::countDown); - } - return latch; - } - public static MessageTask notify(Node originator, Collection<Node.Id> cluster, Topology update) { + pendingTopologies.add(update.epoch()); return MessageTask.begin(originator, cluster, "TopologyNotify:" + update.epoch(), (node, from) -> { long nodeEpoch = node.topology().epoch(); if (nodeEpoch + 1 < update.epoch()) @@ -139,11 +109,6 @@ public class TopologyUpdate }); } - private static Future<Void> broadcast(List<Node.Id> cluster, Function<Node.Id, Node> lookup, String desc, BiConsumer<Node, Node.Id> process) - { - return map(cluster, node -> MessageTask.apply(lookup.apply(node), cluster, desc, process)); - } - private static Collection<Node.Id> allNodesFor(Txn txn, Topology... topologies) { Set<Node.Id> result = new HashSet<>(); @@ -157,9 +122,9 @@ public class TopologyUpdate Map<TxnId, CommandSync> syncMessages = new ConcurrentHashMap<>(); Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new CommandSync(command)); if (committedOnly) - node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer)); + node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(ranges, epoch, commandConsumer)); else - node.local().forEach(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer)); + node.forEachLocal(commandStore -> commandStore.forEpochCommands(ranges, epoch, commandConsumer)); return syncMessages.values().stream().map(cmd -> MessageTask.of(node, recipients.apply(cmd), "Sync:" + cmd.txnId + ':' + epoch + ':' + forEpoch, cmd::process)); } @@ -211,6 +176,10 @@ public class TopologyUpdate continue; Set<Node.Id> newNodes = Sets.difference(nextShard.nodeSet, syncShard.nodeSet); + + if (newNodes.isEmpty()) + continue; + KeyRanges ranges = KeyRanges.singleton(intersection); for (long epoch=1; epoch<syncEpoch; epoch++) messageStream = Stream.concat(messageStream, syncEpochCommands(node, @@ -254,20 +223,16 @@ public class TopologyUpdate return dieExceptionally(last); } - public static void update(Node originator, Topology update, List<Node.Id> cluster, Function<Node.Id, Node> lookup) + public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster) { - long epoch = update.epoch(); - // notify - dieExceptionally(notify(originator, cluster, update) - // sync operations - .flatMap(v -> map(cluster, node -> sync(lookup.apply(node), epoch - 1))) - // inform sync complete - .flatMap(v -> broadcast(cluster, lookup, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(from, epoch)))); + Future<Void> future = dieExceptionally(sync(originator, epoch) + .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch)))); + future.addCallback((unused, throwable) -> pendingTopologies.remove(epoch)); + return future; } - public static Future<Void> syncEpoch(Node originator, long epoch, Collection<Node.Id> cluster) + public static int pendingTopologies() { - return dieExceptionally(sync(originator, epoch) - .flatMap(v -> MessageTask.apply(originator, cluster, "SyncComplete:" + epoch, (node, from) -> node.onEpochSyncComplete(originator.id(), epoch)))); + return pendingTopologies.size(); } } diff --git a/accord-core/src/test/java/accord/coordinate/RecoverTest.java b/accord-core/src/test/java/accord/coordinate/RecoverTest.java index 68e7f02..67b1405 100644 --- a/accord-core/src/test/java/accord/coordinate/RecoverTest.java +++ b/accord-core/src/test/java/accord/coordinate/RecoverTest.java @@ -25,7 +25,7 @@ public class RecoverTest { private static CommandStore getCommandShard(Node node, Key key) { - return node.local(key).orElseThrow(); + return node.unsafeForKey(key); } private static Command getCommand(Node node, Key key, TxnId txnId) diff --git a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java index 5a8e210..de05398 100644 --- a/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java +++ b/accord-core/src/test/java/accord/coordinate/TopologyChangeTest.java @@ -55,7 +55,7 @@ public class TopologyChangeTest TxnId txnId1 = node1.nextTxnId(); Txn txn1 = writeTxn(keys); node1.coordinate(txnId1, txn1).get(); - node1.local(keys).forEach(commands -> { + node1.forEachLocal(keys, commands -> { Command command = commands.command(txnId1); Assertions.assertTrue(command.savedDeps().isEmpty()); }); @@ -69,20 +69,15 @@ public class TopologyChangeTest // new nodes should have the previous epochs operation as a dependency cluster.nodes(4, 5, 6).forEach(node -> { - node.local(keys).forEach(commands -> { + node.forEachLocal(keys, commands -> { Command command = commands.command(txnId2); Assertions.assertTrue(command.savedDeps().contains(txnId1)); }); }); - // old nodes should be aware of the new epoch - cluster.configServices(1, 2, 3).forEach(config -> { - Assertions.assertEquals(2, config.currentEpoch()); - }); - // ...and participated in consensus cluster.nodes(1, 2, 3).forEach(node -> { - node.local(keys).forEach(commands -> { + node.forEachLocal(keys, commands -> { Command command = commands.command(txnId2); Assertions.assertTrue(command.hasBeen(Status.Committed)); }); @@ -105,7 +100,7 @@ public class TopologyChangeTest RecordingMessageSink messageSink = (RecordingMessageSink) node1.messageSink(); messageSink.clearHistory(); TxnId txnId1 = coordinate(node1, keys); - node1.local(keys).forEach(commands -> { + node1.forEachLocal(keys, commands -> { Command command = commands.command(txnId1); Assertions.assertTrue(command.savedDeps().isEmpty()); }); @@ -127,7 +122,7 @@ public class TopologyChangeTest }).collect(Collectors.toSet()); Assertions.assertEquals(idSet(1, 2, 3), accepts); - node1.local(keys).forEach(commands -> { + node1.forEachLocal(keys, commands -> { Command command = commands.command(txnId2); Assertions.assertTrue(command.hasBeen(Status.Committed)); Assertions.assertTrue(command.savedDeps().contains(txnId1)); @@ -140,7 +135,7 @@ public class TopologyChangeTest messageSink.clearHistory(); TxnId txnId3 = coordinate(node1, keys); Assertions.assertFalse(messageSink.requests.stream().anyMatch(env -> env.payload instanceof Accept)); - node1.local(keys).forEach(commands -> { + node1.forEachLocal(keys, commands -> { Command command = commands.command(txnId3); Assertions.assertTrue(command.hasBeen(Status.Committed)); Assertions.assertTrue(command.savedDeps().contains(txnId1)); diff --git a/accord-core/src/test/java/accord/impl/basic/Cluster.java b/accord-core/src/test/java/accord/impl/basic/Cluster.java index d9f5914..2a1429d 100644 --- a/accord-core/src/test/java/accord/impl/basic/Cluster.java +++ b/accord-core/src/test/java/accord/impl/basic/Cluster.java @@ -17,7 +17,7 @@ import java.util.function.Supplier; import accord.api.MessageSink; import accord.burn.BurnTestConfigurationService; -import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.Node.Id; import accord.api.Scheduler; @@ -96,10 +96,10 @@ public class Cluster implements Scheduler || !partitionSet.contains(deliver.src) && !partitionSet.contains(deliver.dst)); if (drop) { - logger.trace("{} DROP[{}] {}", clock++, on.epoch(), deliver); + logger.debug("{} DROP[{}] {}", clock++, on.epoch(), deliver); return true; } - logger.trace("{} RECV[{}] {}", clock++, on.epoch(), deliver); + logger.debug("{} RECV[{}] {}", clock++, on.epoch(), deliver); if (deliver.message instanceof Reply) { Reply reply = (Reply) deliver.message; @@ -153,7 +153,7 @@ public class Cluster implements Scheduler MessageSink messageSink = sinks.create(node, randomSupplier.get()); BurnTestConfigurationService configService = new BurnTestConfigurationService(node, messageSink, randomSupplier, topology, lookup::get); lookup.put(node, new Node(node, messageSink, configService, - nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStore.Factory.SYNCHRONIZED)); + nowSupplier.get(), () -> new ListStore(node), ListAgent.INSTANCE, sinks, CommandStores.Synchronized::new)); } List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes)); diff --git a/accord-core/src/test/java/accord/impl/list/ListData.java b/accord-core/src/test/java/accord/impl/list/ListData.java index 35ef8d5..3008de6 100644 --- a/accord-core/src/test/java/accord/impl/list/ListData.java +++ b/accord-core/src/test/java/accord/impl/list/ListData.java @@ -1,6 +1,9 @@ package accord.impl.list; +import java.util.Arrays; +import java.util.Map; import java.util.TreeMap; +import java.util.stream.Collectors; import accord.api.Data; import accord.api.Key; @@ -10,7 +13,16 @@ public class ListData extends TreeMap<Key, int[]> implements Data @Override public Data merge(Data data) { - this.putAll(((ListData)data)); + if (data != null) + this.putAll(((ListData)data)); return this; } + + @Override + public String toString() + { + return entrySet().stream() + .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue())) + .collect(Collectors.joining(", ", "{", "}")); + } } diff --git a/accord-core/src/test/java/accord/impl/list/ListWrite.java b/accord-core/src/test/java/accord/impl/list/ListWrite.java index 29cc6ab..4be25f3 100644 --- a/accord-core/src/test/java/accord/impl/list/ListWrite.java +++ b/accord-core/src/test/java/accord/impl/list/ListWrite.java @@ -1,6 +1,8 @@ package accord.impl.list; +import java.util.Arrays; import java.util.TreeMap; +import java.util.stream.Collectors; import accord.api.Key; import accord.api.Store; @@ -23,4 +25,12 @@ public class ListWrite extends TreeMap<Key, int[]> implements Write s.data.merge(key, new Timestamped<>(executeAt, data), Timestamped::merge); logger.trace("WRITE on {} at {} key:{} -> {}", s.node, executeAt, key, data); } + + @Override + public String toString() + { + return entrySet().stream() + .map(e -> e.getKey() + "=" + Arrays.toString(e.getValue())) + .collect(Collectors.joining(", ", "{", "}")); + } } diff --git a/accord-core/src/test/java/accord/impl/mock/EpochSync.java b/accord-core/src/test/java/accord/impl/mock/EpochSync.java index 71914dd..80ad2d2 100644 --- a/accord-core/src/test/java/accord/impl/mock/EpochSync.java +++ b/accord-core/src/test/java/accord/impl/mock/EpochSync.java @@ -52,7 +52,7 @@ public class EpochSync implements Runnable @Override public void process(Node node, Node.Id from, ReplyContext replyContext) { - node.local().forEach(commandStore -> { + node.forEachLocal(commandStore -> { Command command = commandStore.command(txnId); command.commit(txn, deps, executeAt); }); @@ -158,7 +158,7 @@ public class EpochSync implements Runnable { Map<TxnId, SyncMessage> syncMessages = new ConcurrentHashMap<>(); Consumer<Command> commandConsumer = command -> syncMessages.put(command.txnId(), new SyncMessage(command)); - node.local().forEach(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer)); + node.forEachLocal(commandStore -> commandStore.forCommittedInEpoch(syncTopology.ranges(), syncEpoch, commandConsumer)); for (SyncMessage message : syncMessages.values()) CommandSync.sync(node, message, nextTopology); diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java b/accord-core/src/test/java/accord/impl/mock/MockCluster.java index 8e00794..0a70760 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java +++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java @@ -4,7 +4,7 @@ import accord.NetworkFilter; import accord.api.MessageSink; import accord.coordinate.Timeout; import accord.impl.TopologyUtils; -import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.Node.Id; import accord.topology.KeyRanges; @@ -24,7 +24,6 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.LongSupplier; import static accord.Utils.*; @@ -90,7 +89,7 @@ public class MockCluster implements Network, AutoCloseable, Iterable<Node> () -> store, new TestAgent(), new ThreadPoolScheduler(), - CommandStore.Factory.SINGLE_THREAD); + CommandStores.SingleThread::new); } private void init(Topology topology) diff --git a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java index 74add97..d0208ee 100644 --- a/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java +++ b/accord-core/src/test/java/accord/impl/mock/MockConfigurationService.java @@ -18,7 +18,6 @@ public class MockConfigurationService implements TestableConfigurationService private final MessageSink messageSink; private final List<Topology> epochs = new ArrayList<>(); private final List<Listener> listeners = new ArrayList<>(); - private final Map<Long, AsyncPromise<Void>> pending = new HashMap<>(); private final EpochFunction<MockConfigurationService> fetchTopologyHandler; public MockConfigurationService(MessageSink messageSink, EpochFunction<MockConfigurationService> fetchTopologyHandler) @@ -53,16 +52,13 @@ public class MockConfigurationService implements TestableConfigurationService } @Override - public synchronized Future<Void> fetchTopologyForEpoch(long epoch) + public synchronized void fetchTopologyForEpoch(long epoch) { if (epoch < epochs.size()) - { - return SUCCESS; - } + return; - Future<Void> future = pending.computeIfAbsent(epoch, e -> new AsyncPromise<>()); fetchTopologyHandler.apply(epoch, this); - return future; + return; } @Override @@ -78,11 +74,6 @@ public class MockConfigurationService implements TestableConfigurationService for (Listener listener : listeners) listener.onTopologyUpdate(topology); - - AsyncPromise<Void> promise = pending.remove(topology.epoch()); - if (promise == null) - return; - promise.setSuccess(null); } public synchronized void reportSyncComplete(Node.Id node, long epoch) diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java b/accord-core/src/test/java/accord/messages/PreAcceptTest.java index 87cc0c5..e3b384f 100644 --- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java +++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java @@ -48,12 +48,12 @@ public class PreAcceptTest () -> store, new TestAgent(), scheduler, - CommandStore.Factory.SINGLE_THREAD); + CommandStores.SingleThread::new); } private static TxnRequest.Scope scope(TxnId txnId, Txn txn) { - return new TxnRequest.Scope(txnId.epoch, new TxnRequest.Scope.KeysForEpoch(txnId.epoch, txn.keys())); + return new TxnRequest.Scope(txnId.epoch, txn.keys()); } private static PreAccept preAccept(TxnId txnId, Txn txn) @@ -72,7 +72,7 @@ public class PreAcceptTest try { IntKey key = IntKey.key(10); - CommandStore commandStore = node.local(key).orElseThrow(); + CommandStore commandStore = node.unsafeForKey(key); Assertions.assertFalse(commandStore.hasCommandsForKey(key)); TxnId txnId = clock.idForNode(1, ID2); @@ -104,7 +104,7 @@ public class PreAcceptTest try { IntKey key = IntKey.key(10); - CommandStore commandStore = node.local(key).orElseThrow(); + CommandStore commandStore = node.unsafeForKey(key); Assertions.assertFalse(commandStore.hasCommandsForKey(key)); TxnId txnId = clock.idForNode(1, ID2); @@ -195,7 +195,7 @@ public class PreAcceptTest try { IntKey key = IntKey.key(10); - CommandStore commandStore = node.local(key).orElseThrow(); + CommandStore commandStore = node.unsafeForKey(key); configService(node).reportTopology(node.topology().current().withEpoch(2)); messageSink.clearHistory(); diff --git a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java index a1250b8..3a0dca4 100644 --- a/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java +++ b/accord-core/src/test/java/accord/messages/TxnRequestScopeTest.java @@ -1,9 +1,7 @@ package accord.messages; import accord.api.KeyRange; -import accord.impl.IntKey; import accord.messages.TxnRequest.Scope; -import accord.messages.TxnRequest.Scope.KeysForEpoch; import accord.topology.Topologies; import accord.topology.Topology; import accord.txn.Keys; @@ -12,29 +10,20 @@ import org.junit.jupiter.api.Test; import static accord.Utils.*; import static accord.Utils.idSet; +import static accord.impl.IntKey.keys; import static accord.impl.IntKey.range; public class TxnRequestScopeTest { - private static KeysForEpoch epochRanges(long epoch, Keys keys) + private static Scope scope(long epoch, Keys keys) { - return new KeysForEpoch(epoch, keys); - } - - private static KeysForEpoch epochRanges(long epoch, int... keys) - { - return epochRanges(epoch, IntKey.keys(keys)); - } - - private static Scope scope(long epoch, KeysForEpoch... epochKeys) - { - return new Scope(epoch, epochKeys); + return new Scope(epoch, keys); } @Test void createDisjointScopeTest() { - Keys keys = IntKey.keys(150); + Keys keys = keys(150); KeyRange range = range(100, 200); Topology topology1 = topology(1, shard(range, idList(1, 2, 3), idSet(1, 2))); Topology topology2 = topology(2, shard(range, idList(4, 5, 6), idSet(4, 5))); @@ -43,16 +32,16 @@ public class TxnRequestScopeTest topologies.add(topology2); topologies.add(topology1); - Assertions.assertEquals(scope(2, epochRanges(1, 150)), + Assertions.assertEquals(scope(1, keys(150)), Scope.forTopologies(id(1), topologies, keys)); - Assertions.assertEquals(scope(2, epochRanges(2, 150)), + Assertions.assertEquals(scope(2, keys(150)), Scope.forTopologies(id(4), topologies, keys)); } @Test void movingRangeTest() { - Keys keys = IntKey.keys(150, 250); + Keys keys = keys(150, 250); KeyRange range1 = range(100, 200); KeyRange range2 = range(200, 300); Topology topology1 = topology(1, @@ -66,9 +55,9 @@ public class TxnRequestScopeTest Topologies.Multi topologies = new Topologies.Multi(); topologies.add(topology2); topologies.add(topology1); - Assertions.assertEquals(scope(2, epochRanges(1, 150), epochRanges(2, 250)), + Assertions.assertEquals(scope(2, keys(150, 250)), Scope.forTopologies(id(1), topologies, keys)); - Assertions.assertEquals(scope(2, epochRanges(1, 250), epochRanges(2, 150)), + Assertions.assertEquals(scope(2, keys(250, 150)), Scope.forTopologies(id(4), topologies, keys)); } } diff --git a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java index c794c24..06f088f 100644 --- a/accord-core/src/test/java/accord/topology/TopologyRandomizer.java +++ b/accord-core/src/test/java/accord/topology/TopologyRandomizer.java @@ -18,6 +18,7 @@ public class TopologyRandomizer private final Supplier<Random> randomSupplier; private final Function<Node.Id, Node> lookup; private final List<Topology> epochs = new ArrayList<>(); + private final Map<Node.Id, KeyRanges> previouslyReplicated = new HashMap<>(); public TopologyRandomizer(Supplier<Random> randomSupplier, Topology initialTopology, Function<Node.Id, Node> lookup) { @@ -25,6 +26,8 @@ public class TopologyRandomizer this.lookup = lookup; this.epochs.add(Topology.EMPTY); this.epochs.add(initialTopology); + for (Node.Id node : initialTopology.nodes()) + previouslyReplicated.put(node, initialTopology.rangesForNode(node)); } private enum UpdateType @@ -155,9 +158,41 @@ public class TopologyRandomizer return shards; } + private static Map<Node.Id, KeyRanges> getAdditions(Topology current, Topology next) + { + Map<Node.Id, KeyRanges> additions = new HashMap<>(); + for (Node.Id node : next.nodes()) + { + KeyRanges prev = current.rangesForNode(node); + if (prev == null) prev = KeyRanges.EMPTY; + + KeyRanges added = next.rangesForNode(node).difference(prev); + if (added.isEmpty()) + continue; + + additions.put(node, added); + } + return additions; + } + + private static boolean reassignsRanges(Topology current, Shard[] nextShards, Map<Node.Id, KeyRanges> previouslyReplicated) + { + Topology next = new Topology(current.epoch + 1, nextShards); + Map<Node.Id, KeyRanges> additions = getAdditions(current, next); + + for (Map.Entry<Node.Id, KeyRanges> entry : additions.entrySet()) + { + if (previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY).intersects(entry.getValue())) + return true; + } + return false; + } + public synchronized void maybeUpdateTopology() { - if (randomSupplier.get().nextInt(50) != 0) + // if we don't limit the number of pending topology changes in flight, + // the topology randomizer will keep the burn test busy indefinitely + if (TopologyUpdate.pendingTopologies() > 5 || randomSupplier.get().nextInt(200) != 0) return; Random random = randomSupplier.get(); @@ -172,6 +207,21 @@ public class TopologyRandomizer Topology nextTopology = new Topology(current.epoch + 1, shards); + // FIXME: remove this (and the corresponding check in CommandStores) once lower bounds are implemented. + // In the meantime, the logic needed to support acquiring ranges that we previously replicated is pretty + // convoluted without the ability to jettison epochs. + if (reassignsRanges(current, shards, previouslyReplicated)) + return; + + Map<Node.Id, KeyRanges> nextAdditions = getAdditions(current, nextTopology); + for (Map.Entry<Node.Id, KeyRanges> entry : nextAdditions.entrySet()) + { + KeyRanges previous = previouslyReplicated.getOrDefault(entry.getKey(), KeyRanges.EMPTY); + KeyRanges added = entry.getValue(); + KeyRanges merged = previous.merge(added).mergeTouching(); + previouslyReplicated.put(entry.getKey(), merged); + } + logger.debug("topology update to: {} from: {}", nextTopology, current); epochs.add(nextTopology); diff --git a/accord-core/src/test/java/accord/utils/MessageTask.java b/accord-core/src/test/java/accord/utils/MessageTask.java index 4db5452..290251c 100644 --- a/accord-core/src/test/java/accord/utils/MessageTask.java +++ b/accord-core/src/test/java/accord/utils/MessageTask.java @@ -123,6 +123,7 @@ public class MessageTask extends AsyncPromise<Void> implements Runnable List<Node.Id> recipients, String desc, NodeProcess process) { + Preconditions.checkArgument(!recipients.isEmpty()); this.originator = originator; this.recipients = ImmutableList.copyOf(recipients); this.desc = desc; diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java index af4d294..eb0eeeb 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Cluster.java @@ -14,7 +14,7 @@ import java.util.function.LongSupplier; import java.util.function.Supplier; import accord.coordinate.Timeout; -import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.Node.Id; import accord.api.MessageSink; @@ -264,7 +264,7 @@ public class Cluster implements Scheduler { MessageSink messageSink = sinks.create(node, randomSupplier.get()); lookup.put(node, new Node(node, messageSink, new SimpleConfigService(topology), - nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStore.Factory.SINGLE_THREAD)); + nowSupplier.get(), MaelstromStore::new, MaelstromAgent.INSTANCE, sinks, CommandStores.SingleThread::new)); } List<Id> nodesList = new ArrayList<>(Arrays.asList(nodes)); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java index ef2cb62..1be6e20 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromData.java @@ -10,7 +10,8 @@ public class MaelstromData extends TreeMap<Key, Value> implements Data @Override public Data merge(Data data) { - this.putAll(((MaelstromData)data)); + if (data != null) + this.putAll(((MaelstromData)data)); return this; } } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java index 5585138..5cad157 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/MaelstromReplyContext.java @@ -18,6 +18,8 @@ public class MaelstromReplyContext implements ReplyContext public static long messageIdFor(ReplyContext replyContext) { + if (replyContext instanceof Packet) + return ((Packet) replyContext).body.msg_id; return ((MaelstromReplyContext) replyContext).messageId; } } diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java index 30c5930..1c4e7f9 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/Main.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/Main.java @@ -15,6 +15,7 @@ import java.util.function.Supplier; import accord.coordinate.Timeout; import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.Node.Id; import accord.api.Scheduler; @@ -139,7 +140,8 @@ public class Main MaelstromInit init = (MaelstromInit) packet.body; topology = topologyFactory.toTopology(init.cluster); sink = new StdoutSink(System::currentTimeMillis, scheduler, start, init.self, out, err); - on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStore.Factory.SINGLE_THREAD); + on = new Node(init.self, sink, new SimpleConfigService(topology), System::currentTimeMillis, + MaelstromStore::new, MaelstromAgent.INSTANCE, scheduler, CommandStores.SingleThread::new); err.println("Initialized node " + init.self); err.flush(); sink.send(packet.src, new Body(Type.init_ok, Body.SENTINEL_MSG_ID, init.msg_id)); diff --git a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java index b6ec202..24dbbe3 100644 --- a/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java +++ b/accord-maelstrom/src/main/java/accord/maelstrom/SimpleConfigService.java @@ -35,9 +35,9 @@ public class SimpleConfigService implements ConfigurationService } @Override - public Future<Void> fetchTopologyForEpoch(long epoch) + public void fetchTopologyForEpoch(long epoch) { - return SUCCESS; + return; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org