This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 0a338da88d3eaef773c2bb8f2f16350a29b65e11 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Fri Aug 2 14:17:42 2024 +0100 CASSANDRA-19825: Fix various bugs and abstraction deficiencies, including: - Remove concept of non-participating home keys; home keys are required to be a participant in the transaction - Remove covering/covers concept - Various invalidation/truncation/erase behaviours patch by Benedict; reviewed by Blake for CASSANDRA-19825 --- modules/accord | 2 +- .../db/compaction/CompactionIterator.java | 16 +++++----- .../index/accord/CheckpointIntervalArrayIndex.java | 2 +- .../service/accord/AccordFetchCoordinator.java | 6 +++- .../service/accord/AccordObjectSizes.java | 16 +++++----- .../cassandra/service/accord/AccordService.java | 11 ++++--- .../cassandra/service/accord/AccordTopology.java | 13 ++++---- .../cassandra/service/accord/IAccordService.java | 18 +++++++++-- .../service/accord/api/AccordTopologySorter.java | 3 +- .../service/accord/fastpath/FastPathStrategy.java | 4 +-- .../fastpath/InheritKeyspaceFastPathStrategy.java | 4 +-- .../fastpath/ParameterizedFastPathStrategy.java | 13 ++++---- .../accord/fastpath/SimpleFastPathStrategy.java | 21 +++++++------ .../accord/repair/RepairSyncPointAdapter.java | 2 +- .../accord/serializers/CommandSerializers.java | 6 +--- .../service/accord/serializers/DepsSerializer.java | 12 ++++---- .../serializers/InformHomeDurableSerializers.java | 13 ++------ .../service/accord/serializers/KeySerializers.java | 36 ++++++---------------- .../accord/serializers/TopologySerializers.java | 4 +-- .../cassandra/service/accord/txn/TxnRead.java | 16 ++++++++-- .../cassandra/service/accord/txn/TxnUpdate.java | 9 ++++++ .../accord/txn/UnrecoverableRepairUpdate.java | 7 +++++ .../cassandra/utils/CollectionSerializers.java | 10 ++++++ .../compaction/CompactionAccordIteratorsTest.java | 9 ++++-- .../index/accord/AccordIndexStressTest.java | 2 +- .../cassandra/index/accord/RouteIndexTest.java | 2 +- .../service/accord/AccordCommandStoreTest.java | 3 +- .../service/accord/AccordCommandTest.java | 6 ++-- .../accord/AccordConfigurationServiceTest.java | 4 +-- .../service/accord/AccordKeyspaceTest.java | 2 +- .../service/accord/AccordMessageSinkTest.java | 5 +-- .../cassandra/service/accord/AccordTestUtils.java | 15 ++++----- .../service/accord/AccordTopologyUtils.java | 4 +-- .../ParameterizedFastPathStrategyTest.java | 4 +-- .../serializers/CheckStatusSerializersTest.java | 2 +- .../serializers/CommandsForKeySerializerTest.java | 2 +- .../org/apache/cassandra/utils/RangeTreeTest.java | 2 +- 37 files changed, 173 insertions(+), 133 deletions(-) diff --git a/modules/accord b/modules/accord index 5f360e0b5b..13de69240b 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 5f360e0b5b197156df0ef3d9985cd94d18ea1c92 +Subproject commit 13de69240b81a5e87d5965ed2d1d92c58cf076cf diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index e0dcb5682f..4dbeae7896 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; import accord.local.Cleanup; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.RedundantBefore; import accord.local.SaveStatus; @@ -87,7 +88,6 @@ import org.apache.cassandra.service.accord.IAccordService; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.paxos.PaxosRepairHistory; import org.apache.cassandra.service.paxos.uncommitted.PaxosRows; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.TimeUUID; import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME; @@ -784,6 +784,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte class AccordCommandsPurger extends AbstractPurger { final Int2ObjectHashMap<RedundantBefore> redundantBefores; + final Int2ObjectHashMap<RangesForEpoch> ranges; final DurableBefore durableBefore; int storeId; @@ -791,9 +792,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordCommandsPurger(Supplier<IAccordService> accordService) { - Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> redundantBeforesAndDurableBefore = accordService.get().getRedundantBeforesAndDurableBefore(); - this.redundantBefores = redundantBeforesAndDurableBefore.left; - this.durableBefore = redundantBeforesAndDurableBefore.right; + IAccordService.CompactionInfo compactionInfo = accordService.get().getCompactionInfo(); + this.redundantBefores = compactionInfo.redundantBefores; + this.ranges = compactionInfo.ranges; + this.durableBefore = compactionInfo.durableBefore; } protected void beginPartition(UnfilteredRowIterator partition) @@ -815,7 +817,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte // When commands end up being sliced by compaction we need this to discard tombstones and slices // without enough information to run the rest of the cleanup logic - if (Cleanup.isSafeToCleanup(durableBefore, txnId)) + if (Cleanup.isSafeToCleanup(durableBefore, txnId, ranges.get(storeId).allAt(txnId.epoch()))) return null; Cell durabilityCell = row.getCell(CommandsColumns.durability); @@ -878,7 +880,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordTimestampsForKeyPurger(Supplier<IAccordService> accordService) { - this.redundantBefores = accordService.get().getRedundantBeforesAndDurableBefore().left; + this.redundantBefores = accordService.get().getCompactionInfo().redundantBefores; } protected void beginPartition(UnfilteredRowIterator partition) @@ -954,7 +956,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor, Supplier<IAccordService> accordService) { this.accessor = accessor; - this.redundantBefores = accordService.get().getRedundantBeforesAndDurableBefore().left; + this.redundantBefores = accordService.get().getCompactionInfo().redundantBefores; } protected void beginPartition(UnfilteredRowIterator partition) diff --git a/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java b/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java index 9cf950410d..e478d26982 100644 --- a/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java +++ b/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java @@ -609,7 +609,7 @@ public class CheckpointIntervalArrayIndex }; var searcher = new CheckpointIntervalArray<>(accessor, indexInput, checkpoints.bounds, checkpoints.headers, checkpoints.lists, checkpoints.maxScanAndCheckpointMatches); - searcher.forEach(start, end, (i1, i2, i3, i4, index) -> { + searcher.forEachRange(start, end, (i1, i2, i3, i4, index) -> { stats.matches++; callback.accept(reader.copyTo(accessor.get(indexInput, index), buffer)); }, (i1, i2, i3, i4, startIdx, endIdx) -> { diff --git a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java index 088c6c1331..e4ad2aa559 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java +++ b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java @@ -35,6 +35,7 @@ import accord.local.CommandStore; import accord.local.Node; import accord.local.SafeCommandStore; import accord.primitives.PartialTxn; +import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.Ranges; import accord.primitives.Routable; @@ -290,6 +291,9 @@ public class AccordFetchCoordinator extends AbstractFetchCoordinator implements @Override public Read slice(Ranges ranges) { return new StreamingRead(to, this.ranges.slice(ranges)); } + @Override + public Read intersecting(Participants<?> participants) { return new StreamingRead(to, this.ranges.intersecting(ranges)); } + @Override public Read merge(Read other) { throw new UnsupportedOperationException(); } } @@ -383,7 +387,7 @@ public class AccordFetchCoordinator extends AbstractFetchCoordinator implements protected PartialTxn rangeReadTxn(Ranges ranges) { StreamingRead read = new StreamingRead(FBUtilities.getBroadcastAddressAndPort(), ranges); - return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, read, noopQuery, null); + return new PartialTxn.InMemory(Txn.Kind.Read, ranges, read, noopQuery, null); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index d3cd7a34fd..46e0d8c158 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -137,7 +137,7 @@ public class AccordObjectSizes return EMPTY_ROUTING_KEYS_SIZE + routingKeysOnly(keys); } - private static final long EMPTY_FULL_KEY_ROUTE_SIZE = measure(new FullKeyRoute(new TokenKey(null, null), true, new RoutingKey[0])); + private static final long EMPTY_FULL_KEY_ROUTE_SIZE = measure(new FullKeyRoute(new TokenKey(null, null), new RoutingKey[0])); public static long fullKeyRoute(FullKeyRoute route) { return EMPTY_FULL_KEY_ROUTE_SIZE @@ -145,12 +145,11 @@ public class AccordObjectSizes + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error } - private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = measure(new PartialKeyRoute(Ranges.EMPTY, new TokenKey(null, null), true, new RoutingKey[0])); + private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = measure(new PartialKeyRoute(new TokenKey(null, null), new RoutingKey[0])); public static long partialKeyRoute(PartialKeyRoute route) { return EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE + routingKeysOnly(route) - + ranges(route.covering()) + key(route.homeKey()); } @@ -162,7 +161,7 @@ public class AccordObjectSizes return size; } - private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = measure(new FullRangeRoute(new TokenKey(null, null), true, new Range[0])); + private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = measure(new FullRangeRoute(new TokenKey(null, null), new Range[0])); public static long fullRangeRoute(FullRangeRoute route) { return EMPTY_FULL_RANGE_ROUTE_SIZE @@ -170,12 +169,11 @@ public class AccordObjectSizes + key(route.homeKey()); // TODO: we will probably dedup homeKey, serializer dependent, but perhaps this is an acceptable error } - private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE = measure(new PartialRangeRoute(Ranges.EMPTY, new TokenKey(null, null), true, new Range[0])); + private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE = measure(new PartialRangeRoute(new TokenKey(null, null), new Range[0])); public static long partialRangeRoute(PartialRangeRoute route) { return EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE + rangesOnly(route) - + ranges(route.covering()) + key(route.homeKey()); } @@ -193,7 +191,7 @@ public class AccordObjectSizes } } - private static final long EMPTY_TXN = measure(new PartialTxn.InMemory(null, null, null, null, null, null)); + private static final long EMPTY_TXN = measure(new PartialTxn.InMemory(null, null, null, null, null)); public static long txn(PartialTxn txn) { long size = EMPTY_TXN; @@ -283,13 +281,13 @@ public class AccordObjectSizes private static CommonAttributes attrs(boolean hasDeps, boolean hasTxn) { - CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(EMPTY_TXNID).route(new FullKeyRoute(EMPTY_KEY, true, new RoutingKey[]{ EMPTY_KEY })); + CommonAttributes.Mutable attrs = new CommonAttributes.Mutable(EMPTY_TXNID).route(new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{ EMPTY_KEY })); attrs.durability(Status.Durability.NotDurable); if (hasDeps) attrs.partialDeps(PartialDeps.NONE); if (hasTxn) - attrs.partialTxn(new PartialTxn.InMemory(null, null, null, null, null, null)); + attrs.partialTxn(new PartialTxn.InMemory(null, null, null, null, null)); return attrs; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 752e041f48..75450fff2f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -45,6 +45,7 @@ import accord.coordinate.Exhausted; import accord.coordinate.FailureAccumulator; import accord.coordinate.TopologyMismatch; import accord.impl.CoordinateDurabilityScheduling; +import accord.local.CommandStores; import accord.primitives.SyncPoint; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.statements.RequestValidations; @@ -226,9 +227,9 @@ public class AccordService implements IAccordService, Shutdownable public void receive(Message<List<AccordSyncPropagator.Notification>> message) {} @Override - public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore() + public CompactionInfo getCompactionInfo() { - return Pair.create(new Int2ObjectHashMap<>(), DurableBefore.EMPTY); + return new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), DurableBefore.EMPTY); } }; @@ -773,17 +774,19 @@ public class AccordService implements IAccordService, Shutdownable } @Override - public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore() + public CompactionInfo getCompactionInfo() { Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); + Int2ObjectHashMap<CommandStores.RangesForEpoch>ranges = new Int2ObjectHashMap<>(); AtomicReference<DurableBefore> durableBefore = new AtomicReference<>(DurableBefore.EMPTY); AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> { synchronized (redundantBefores) { redundantBefores.put(safeStore.commandStore().id(), safeStore.commandStore().redundantBefore()); + ranges.put(safeStore.commandStore().id(), safeStore.ranges()); } durableBefore.set(DurableBefore.merge(durableBefore.get(), safeStore.commandStore().durableBefore())); })); - return Pair.create(redundantBefores, durableBefore.get()); + return new CompactionInfo(redundantBefores, ranges, durableBefore.get()); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index 0814c322d5..7bbfc0c250 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -32,6 +32,7 @@ import accord.local.Node; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.Invariants; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; @@ -64,7 +65,7 @@ public class AccordTopology private static class ShardLookup extends HashMap<accord.primitives.Range, Shard> { - private Shard createOrReuse(accord.primitives.Range range, List<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id> joining) + private Shard createOrReuse(accord.primitives.Range range, SortedArrayList<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id> joining) { Shard prev = get(range); if (prev != null @@ -81,10 +82,10 @@ public class AccordTopology { private final KeyspaceMetadata keyspace; private final Range<Token> range; - private final List<Node.Id> nodes; + private final SortedArrayList<Node.Id> nodes; private final Set<Node.Id> pending; - private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range, List<Node.Id> nodes, Set<Node.Id> pending) + private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range, SortedArrayList<Node.Id> nodes, Set<Node.Id> pending) { this.keyspace = keyspace; this.range = range; @@ -106,7 +107,7 @@ public class AccordTopology { TokenRange tokenRange = AccordTopology.range(metadata.id, range); - Set<Node.Id> fastPath = strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap); + SortedArrayList<Node.Id> fastPath = strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap); return lookup.createOrReuse(tokenRange, nodes, fastPath, pending); } @@ -122,10 +123,10 @@ public class AccordTopology Sets.SetView<InetAddressAndPort> readOnly = Sets.difference(readEndpoints, writeEndpoints); Invariants.checkState(readOnly.isEmpty(), "Read only replicas detected: %s", readOnly); - List<Node.Id> nodes = writes.endpoints().stream() + SortedArrayList<Node.Id> nodes = new SortedArrayList<>(writes.endpoints().stream() .map(directory::peerId) .map(AccordTopology::tcmIdToAccord) - .sorted().collect(Collectors.toList()); + .sorted().toArray(Node.Id[]::new)); Set<Node.Id> pending = readEndpoints.equals(writeEndpoints) ? Collections.emptySet() : diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index ae0d08c48e..828ca1f287 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord; import accord.api.BarrierType; +import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.Node.Id; import accord.local.RedundantBefore; @@ -40,7 +41,6 @@ import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import org.apache.cassandra.service.accord.api.AccordScheduler; import org.apache.cassandra.service.accord.txn.TxnResult; import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Future; import javax.annotation.Nonnull; @@ -108,10 +108,24 @@ public interface IAccordService void receive(Message<List<AccordSyncPropagator.Notification>> message); + class CompactionInfo + { + public final Int2ObjectHashMap<RedundantBefore> redundantBefores; + public final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges; + public final DurableBefore durableBefore; + + public CompactionInfo(Int2ObjectHashMap<RedundantBefore> redundantBefores, Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges, DurableBefore durableBefore) + { + this.redundantBefores = redundantBefores; + this.ranges = ranges; + this.durableBefore = durableBefore; + } + } + /** * Fetch the redundnant befores for every command store */ - Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore(); + CompactionInfo getCompactionInfo(); default Id nodeId() { throw new UnsupportedOperationException(); } } diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java index cbc46ef9a0..7d1ab21224 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java @@ -27,6 +27,7 @@ import accord.local.Node; import accord.topology.ShardSelection; import accord.topology.Topologies; import accord.topology.Topology; +import accord.utils.SortedList; import org.apache.cassandra.locator.DynamicEndpointSnitch; import org.apache.cassandra.locator.Endpoint; import org.apache.cassandra.locator.IEndpointSnitch; @@ -61,7 +62,7 @@ public class AccordTopologySorter implements TopologySorter return create(topologies.nodes()); } - private AccordTopologySorter create(Set<Node.Id> nodes) + private AccordTopologySorter create(SortedList<Node.Id> nodes) { SortableEndpoints endpoints = SortableEndpoints.from(nodes, mapper); Comparator<Endpoint> comparator = snitch.endpointComparator(FBUtilities.getBroadcastAddressAndPort(), endpoints); diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java index 6978721a81..f6a81b0b66 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java @@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.fastpath; import java.io.IOException; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; @@ -29,6 +28,7 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; import accord.local.Node; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus; @@ -83,7 +83,7 @@ public interface FastPathStrategy * @param dcMap * @return */ - Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap); + SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap); Kind kind(); diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java index 08b7763a93..2ffc3c7cbf 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java @@ -18,13 +18,13 @@ package org.apache.cassandra.service.accord.fastpath; -import java.util.List; import java.util.Map; import java.util.Set; import com.google.common.collect.ImmutableMap; import accord.local.Node; +import accord.utils.SortedArrays.SortedArrayList; public class InheritKeyspaceFastPathStrategy implements FastPathStrategy { @@ -35,7 +35,7 @@ public class InheritKeyspaceFastPathStrategy implements FastPathStrategy private InheritKeyspaceFastPathStrategy() {} @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { throw new IllegalStateException("InheritKeyspaceFastPathStrategy should be replaced before calculateFastPath is called"); } diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java index aef562f99a..dfe027367e 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java @@ -20,6 +20,7 @@ package org.apache.cassandra.service.accord.fastpath; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -32,12 +33,12 @@ import java.util.stream.Collectors; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import accord.api.VisibleForImplementation; import accord.local.Node; import accord.topology.Shard; import accord.utils.Invariants; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus; @@ -204,7 +205,7 @@ public class ParameterizedFastPathStrategy implements FastPathStrategy } @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { List<NodeSorter> sorters = new ArrayList<>(nodes.size()); @@ -221,12 +222,12 @@ public class ParameterizedFastPathStrategy implements FastPathStrategy int slowQuorum = Shard.slowPathQuorumSize(nodes.size()); int fpSize = Math.max(size, slowQuorum); - ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder(); - + Node.Id[] array = new Node.Id[fpSize]; for (int i=0; i<fpSize; i++) - builder.add(sorters.get(i).id); + array[i] = sorters.get(i).id; - ImmutableSet<Node.Id> fastPath = builder.build(); + Arrays.sort(array); + SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array); Invariants.checkState(fastPath.size() >= slowQuorum); return fastPath; } diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java index 3f2e27bdc8..37d51b6c82 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java @@ -18,16 +18,16 @@ package org.apache.cassandra.service.accord.fastpath; -import java.util.List; import java.util.Map; import java.util.Set; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import accord.local.Node; import accord.topology.Shard; +import accord.utils.ArrayBuffers; import accord.utils.Invariants; +import accord.utils.SortedArrays.SortedArrayList; public class SimpleFastPathStrategy implements FastPathStrategy { @@ -38,26 +38,27 @@ public class SimpleFastPathStrategy implements FastPathStrategy private SimpleFastPathStrategy() {} @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { int maxFailures = Shard.maxToleratedFailures(nodes.size()); int discarded = 0; - ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder(); + if (unavailable.isEmpty()) + return nodes; + Object[] tmp = ArrayBuffers.cachedAny().get(nodes.size()); for (int i=0,mi=nodes.size(); i<mi; i++) { Node.Id node = nodes.get(i); if (unavailable.contains(node) && discarded < maxFailures) - { discarded++; - continue; - } - - builder.add(node); + else + tmp[i - discarded] = node; } - Set<Node.Id> fastPath = builder.build(); + Node.Id[] array = new Node.Id[nodes.size() - discarded]; + System.arraycopy(tmp, 0, array, 0, nodes.size() - discarded); + SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array); Invariants.checkState(fastPath.size() >= Shard.slowPathQuorumSize(nodes.size())); return fastPath; } diff --git a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java index 76e29adbc5..8cc04f60da 100644 --- a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java +++ b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java @@ -61,7 +61,7 @@ public class RepairSyncPointAdapter<S extends Seekables<?, ?>> extends Coordinat public void execute(Node node, Topologies all, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<S>, Throwable> callback) { RequiredResponseTracker tracker = new RequiredResponseTracker(requiredResponses, all); - ExecuteSyncPoint.ExecuteBlocking<S> execute = new ExecuteSyncPoint.ExecuteBlocking<>(node, tracker, new SyncPoint<>(txnId, deps, (S) txn.keys(), route), executeAt); + ExecuteSyncPoint.ExecuteBlocking<S> execute = new ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<>(txnId, deps, (S) txn.keys(), route), tracker, executeAt); execute.addCallback(callback); execute.start(); } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index fe16d3033e..cd76550262 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -36,7 +36,6 @@ import accord.local.Status.Known; import accord.primitives.Ballot; import accord.primitives.PartialTxn; import accord.primitives.ProgressToken; -import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.Txn; @@ -234,7 +233,6 @@ public class CommandSerializers private void serializeWithoutKeys(PartialTxn txn, DataOutputPlus out, int version) throws IOException { CommandSerializers.kind.serialize(txn.kind(), out, version); - KeySerializers.ranges.serialize(txn.covering(), out, version); readSerializer.serialize(txn.read(), out, version); querySerializer.serialize(txn.query(), out, version); out.writeBoolean(txn.update() != null); @@ -245,18 +243,16 @@ public class CommandSerializers private PartialTxn deserializeWithoutKeys(Seekables<?, ?> keys, DataInputPlus in, int version) throws IOException { Txn.Kind kind = CommandSerializers.kind.deserialize(in, version); - Ranges covering = KeySerializers.ranges.deserialize(in, version); Read read = readSerializer.deserialize(in, version); Query query = querySerializer.deserialize(in, version); Update update = in.readBoolean() ? updateSerializer.deserialize(in, version) : null; - return new PartialTxn.InMemory(covering, kind, keys, read, query, update); + return new PartialTxn.InMemory(kind, keys, read, query, update); } private long serializedSizeWithoutKeys(PartialTxn txn, int version) { long size = CommandSerializers.kind.serializedSize(txn.kind(), version); - size += KeySerializers.ranges.serializedSize(txn.covering(), version); size += readSerializer.serializedSize(txn.read(), version); size += querySerializer.serializedSize(txn.query(), version); size += TypeSizes.sizeof(txn.update() != null); diff --git a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java index dd78761cda..841d89882e 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java @@ -25,9 +25,9 @@ import accord.primitives.Deps; import accord.primitives.KeyDeps; import accord.primitives.Keys; import accord.primitives.PartialDeps; +import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.RangeDeps; -import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.TxnId; import org.apache.cassandra.io.IVersionedSerializer; @@ -60,7 +60,7 @@ public abstract class DepsSerializer<D extends Deps> extends IVersionedWithKeysS @Override PartialDeps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, KeyDeps directKeyDeps, DataInputPlus in, int version) throws IOException { - Ranges covering = KeySerializers.ranges.deserialize(in, version); + Participants<?> covering = KeySerializers.participants.deserialize(in, version); return new PartialDeps(covering, keyDeps, rangeDeps, directKeyDeps); } @@ -68,28 +68,28 @@ public abstract class DepsSerializer<D extends Deps> extends IVersionedWithKeysS public void serialize(PartialDeps partialDeps, DataOutputPlus out, int version) throws IOException { super.serialize(partialDeps, out, version); - KeySerializers.ranges.serialize(partialDeps.covering, out, version); + KeySerializers.participants.serialize(partialDeps.covering, out, version); } @Override public void serialize(Seekables<?, ?> superset, PartialDeps partialDeps, DataOutputPlus out, int version) throws IOException { super.serialize(superset, partialDeps, out, version); - KeySerializers.ranges.serialize(partialDeps.covering, out, version); + KeySerializers.participants.serialize(partialDeps.covering, out, version); } @Override public long serializedSize(PartialDeps partialDeps, int version) { return super.serializedSize(partialDeps, version) - + KeySerializers.ranges.serializedSize(partialDeps.covering, version); + + KeySerializers.participants.serializedSize(partialDeps.covering, version); } @Override public long serializedSize(Seekables<?, ?> keys, PartialDeps partialDeps, int version) { return super.serializedSize(keys, partialDeps, version) - + KeySerializers.ranges.serializedSize(partialDeps.covering, version); + + KeySerializers.participants.serializedSize(partialDeps.covering, version); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java index 50f53a04f6..c4f2155577 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java @@ -25,13 +25,9 @@ import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import static org.apache.cassandra.utils.CollectionSerializers.deserializeSet; -import static org.apache.cassandra.utils.CollectionSerializers.serializeCollection; -import static org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize; - public class InformHomeDurableSerializers { - public static final IVersionedSerializer<InformHomeDurable> request = new IVersionedSerializer<InformHomeDurable>() + public static final IVersionedSerializer<InformHomeDurable> request = new IVersionedSerializer<>() { @Override public void serialize(InformHomeDurable inform, DataOutputPlus out, int version) throws IOException @@ -40,7 +36,6 @@ public class InformHomeDurableSerializers KeySerializers.route.serialize(inform.route, out, version); CommandSerializers.timestamp.serialize(inform.executeAt, out, version); CommandSerializers.durability.serialize(inform.durability, out, version); - serializeCollection(inform.persistedOn, out, version, TopologySerializers.nodeId); } @Override @@ -49,8 +44,7 @@ public class InformHomeDurableSerializers return new InformHomeDurable(CommandSerializers.txnId.deserialize(in, version), KeySerializers.route.deserialize(in, version), CommandSerializers.timestamp.deserialize(in, version), - CommandSerializers.durability.deserialize(in, version), - deserializeSet(in, version, TopologySerializers.nodeId)); + CommandSerializers.durability.deserialize(in, version)); } @Override @@ -59,8 +53,7 @@ public class InformHomeDurableSerializers return CommandSerializers.txnId.serializedSize(inform.txnId, version) + KeySerializers.route.serializedSize(inform.route, version) + CommandSerializers.timestamp.serializedSize(inform.executeAt, version) - + CommandSerializers.durability.serializedSize(inform.durability, version) - + serializedCollectionSize(inform.persistedOn, version, TopologySerializers.nodeId); + + CommandSerializers.durability.serializedSize(inform.durability, version); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java index 6d2e86e8bb..1e105d8192 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java @@ -91,38 +91,31 @@ public class KeySerializers { @Override PartialKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException { - Ranges covering = ranges.deserialize(in, version); RoutingKey homeKey = routingKey.deserialize(in, version); - boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0; - return PartialKeyRoute.SerializationSupport.create(covering, homeKey, isParticipatingHomeKey, keys); + return PartialKeyRoute.SerializationSupport.create(homeKey, keys); } @Override public void serialize(PartialKeyRoute route, DataOutputPlus out, int version) throws IOException { super.serialize(route, out, version); - ranges.serialize(route.covering, out, version); routingKey.serialize(route.homeKey, out, version); - out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0); } @Override public long serializedSize(PartialKeyRoute keys, int version) { return super.serializedSize(keys, version) - + ranges.serializedSize(keys.covering, version) - + routingKey.serializedSize(keys.homeKey, version) - + 1; + + routingKey.serializedSize(keys.homeKey, version); } }; - public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<RoutingKey, FullKeyRoute>(routingKey, RoutingKey[]::new) + public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new AbstractKeysSerializer<>(routingKey, RoutingKey[]::new) { @Override FullKeyRoute deserialize(DataInputPlus in, int version, RoutingKey[] keys) throws IOException { RoutingKey homeKey = routingKey.deserialize(in, version); - boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0; - return FullKeyRoute.SerializationSupport.create(homeKey, isParticipatingHomeKey, keys); + return FullKeyRoute.SerializationSupport.create(homeKey, keys); } @Override @@ -130,15 +123,13 @@ public class KeySerializers { super.serialize(route, out, version); routingKey.serialize(route.homeKey, out, version); - out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0); } @Override public long serializedSize(FullKeyRoute route, int version) { return super.serializedSize(route, version) - + routingKey.serializedSize(route.homeKey, version) - + 1; + + routingKey.serializedSize(route.homeKey, version); } }; @@ -146,28 +137,22 @@ public class KeySerializers { @Override PartialRangeRoute deserialize(DataInputPlus in, int version, Range[] rs) throws IOException { - Ranges covering = ranges.deserialize(in, version); RoutingKey homeKey = routingKey.deserialize(in, version); - boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0; - return PartialRangeRoute.SerializationSupport.create(covering, homeKey, isParticipatingHomeKey, rs); + return PartialRangeRoute.SerializationSupport.create(homeKey, rs); } @Override public void serialize(PartialRangeRoute route, DataOutputPlus out, int version) throws IOException { super.serialize(route, out, version); - ranges.serialize(route.covering, out, version); routingKey.serialize(route.homeKey, out, version); - out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0); } @Override public long serializedSize(PartialRangeRoute rs, int version) { return super.serializedSize(rs, version) - + ranges.serializedSize(rs.covering, version) - + routingKey.serializedSize(rs.homeKey, version) - + 1; + + routingKey.serializedSize(rs.homeKey, version); } }; @@ -177,8 +162,7 @@ public class KeySerializers @Override FullRangeRoute deserialize(DataInputPlus in, int version, Range[] Ranges) throws IOException { RoutingKey homeKey = routingKey.deserialize(in, version); - boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0; - return FullRangeRoute.SerializationSupport.create(homeKey, isParticipatingHomeKey, Ranges); + return FullRangeRoute.SerializationSupport.create(homeKey, Ranges); } @Override @@ -186,15 +170,13 @@ public class KeySerializers { super.serialize(route, out, version); routingKey.serialize(route.homeKey, out, version); - out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0); } @Override public long serializedSize(FullRangeRoute ranges, int version) { return super.serializedSize(ranges, version) - + routingKey.serializedSize(ranges.homeKey(), version) - + 1; + + routingKey.serializedSize(ranges.homeKey(), version); } }; diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java index 4693c03c5c..782ecbf5ed 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java @@ -20,13 +20,13 @@ package org.apache.cassandra.service.accord.serializers; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.List; import java.util.Set; import accord.local.Node; import accord.primitives.Range; import accord.topology.Shard; import accord.topology.Topology; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.ValueAccessor; import org.apache.cassandra.io.IVersionedSerializer; @@ -135,7 +135,7 @@ public class TopologySerializers public Shard deserialize(DataInputPlus in, int version) throws IOException { Range range = TokenRange.serializer.deserialize(in, version); - List<Node.Id> nodes = CollectionSerializers.deserializeList(in, version, nodeId); + SortedArrayList<Node.Id> nodes = CollectionSerializers.deserializeSortedArrayList(in, version, nodeId, Node.Id[]::new); Set<Node.Id> fastPathElectorate = CollectionSerializers.deserializeSet(in, version, nodeId); Set<Node.Id> joining = CollectionSerializers.deserializeSet(in, version, nodeId); return new Shard(range, nodes, fastPathElectorate, joining); diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java index 62dfe68e2c..694c3f225a 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java @@ -32,6 +32,7 @@ import accord.api.DataStore; import accord.api.Read; import accord.local.SafeCommandStore; import accord.primitives.Keys; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Seekable; import accord.primitives.Timestamp; @@ -158,14 +159,25 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read @Override public Read slice(Ranges ranges) { - Keys keys = itemKeys.slice(ranges); + return intersecting(itemKeys.slice(ranges)); + } + + @Override + public Read intersecting(Participants<?> participants) + { + return intersecting(itemKeys.intersecting(participants)); + } + + private Read intersecting(Keys select) + { + Keys keys = itemKeys.intersecting(select); List<TxnNamedRead> reads = new ArrayList<>(keys.size()); for (TxnNamedRead read : items) if (keys.contains(read.key())) reads.add(read); - return createTxnRead(reads, txnKeys.slice(ranges), cassandraConsistencyLevel); + return createTxnRead(reads, txnKeys.intersecting(select), cassandraConsistencyLevel); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java index 5a32065e97..563af81675 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java @@ -32,6 +32,7 @@ import accord.api.Data; import accord.api.Key; import accord.api.Update; import accord.primitives.Keys; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.RoutableKey; import accord.primitives.Timestamp; @@ -148,6 +149,14 @@ public class TxnUpdate extends AccordUpdate return new TxnUpdate(keys, select(this.keys, keys, fragments), condition, cassandraCommitCL); } + @Override + public Update intersecting(Participants<?> participants) + { + Keys keys = this.keys.intersecting(participants); + // TODO: Slice the condition. + return new TxnUpdate(keys, select(this.keys, keys, fragments), condition, cassandraCommitCL); + } + private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from) { ByteBuffer[] result = new ByteBuffer[out.size()]; diff --git a/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java index 310a3e6a58..0efcbf1ffd 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java @@ -29,6 +29,7 @@ import accord.api.Data; import accord.api.Update; import accord.api.Write; import accord.local.Node; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Seekables; import accord.primitives.Timestamp; @@ -147,6 +148,12 @@ public class UnrecoverableRepairUpdate<E extends Endpoints<E>, P extends Replica return this; } + @Override + public Update intersecting(Participants<?> participants) + { + return this; + } + @Override public Update merge(Update other) { diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java b/src/java/org/apache/cassandra/utils/CollectionSerializers.java index 1fcb7cc2f3..0cf7fce0d9 100644 --- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java +++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java @@ -31,6 +31,7 @@ import javax.annotation.Nonnull; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.IPartitionerDependentSerializer; import org.apache.cassandra.io.IVersionedSerializer; @@ -102,6 +103,15 @@ public class CollectionSerializers } } + public static <V extends Comparable<? super V>> SortedArrayList<V> deserializeSortedArrayList(DataInputPlus in, int version, IVersionedSerializer<V> serializer, IntFunction<V[]> allocator) throws IOException + { + int size = in.readUnsignedVInt32(); + V[] array = allocator.apply(size); + for (int i = 0 ; i < array.length ; ++i) + array[i] = serializer.deserialize(in, version); + return new SortedArrayList<>(array); + } + public static <V> List<V> deserializeList(DataInputPlus in, int version, IVersionedSerializer<V> serializer) throws IOException { return deserializeCollection(in, version, serializer, newArrayList()); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index 493283bdae..3e1f75f62a 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import accord.primitives.Routable; +import accord.local.CommandStores; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.shared.WithProperties; import org.apache.cassandra.service.accord.*; @@ -418,7 +419,9 @@ public class CompactionAccordIteratorsTest Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); if (redundantBefore != null) redundantBefores.put(commandStore.id(), redundantBefore); - when(mockAccordService.getRedundantBeforesAndDurableBefore()).thenReturn(Pair.create(redundantBefores, durableBefore)); + Int2ObjectHashMap<CommandStores.RangesForEpoch> rangesForEpochs = new Int2ObjectHashMap<>(); + rangesForEpochs.put(commandStore.id(), commandStore.unsafeRangesForEpoch()); + when(mockAccordService.getCompactionInfo()).thenReturn(new IAccordService.CompactionInfo(redundantBefores, rangesForEpochs, durableBefore)); return mockAccordService; } @@ -463,7 +466,7 @@ public class CompactionAccordIteratorsTest for (TxnId txnId : txnIds) { Txn txn = txnId.kind().isWrite() ? writeTxn : readTxn; - PartialDeps partialDeps = Deps.NONE.slice(AccordTestUtils.fullRange(txn)); + PartialDeps partialDeps = Deps.NONE.intersecting(AccordTestUtils.fullRange(txn)); PartialTxn partialTxn = txn.slice(commandStore.unsafeRangesForEpoch().currentRanges(), true); PartialRoute<?> partialRoute = route.slice(commandStore.unsafeRangesForEpoch().currentRanges()); getUninterruptibly(commandStore.execute(contextFor(txnId, txn.keys(), COMMANDS), safe -> { @@ -559,7 +562,7 @@ public class CompactionAccordIteratorsTest scanners.add(random.nextInt(scanners.size()), new Scanner(cfs.metadata(), outputPartitions.stream().map(Partition::unfilteredIterator).collect(Collectors.toList()))); } while (!scanners.isEmpty()); - verify(mockAccordService, times(singleCompaction || numScanners == 1 ? 1 : numScanners - 1)).getRedundantBeforesAndDurableBefore(); + verify(mockAccordService, times(singleCompaction || numScanners == 1 ? 1 : numScanners - 1)).getCompactionInfo(); return result; } } diff --git a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java index c4085c6083..59a7cf437d 100644 --- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java +++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java @@ -464,7 +464,7 @@ public class AccordIndexStressTest extends CQLTester var token = new Murmur3Partitioner.LongToken(rs.nextInt(minToken, maxToken)); keys.add(new TokenKey(table, token)); } - return new FullKeyRoute(keys.first(), true, keys.toArray(RoutingKey[]::new)); + return new FullKeyRoute(keys.first(), keys.toArray(RoutingKey[]::new)); } case Range: { diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index 5200316a2f..17de14e415 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -259,7 +259,7 @@ public class RouteIndexTest extends CQLTester.InMemory var token = new LongToken(state.tokenGen.nextInt(rs)); keys.add(new TokenKey(table, token)); } - return new FullKeyRoute(keys.first(), true, keys.toArray(RoutingKey[]::new)); + return new FullKeyRoute(keys.first(), keys.toArray(RoutingKey[]::new)); } case Range: { diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java index bcc1f8bf11..dde88ca468 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java @@ -41,6 +41,7 @@ import accord.primitives.Ballot; import accord.primitives.PartialDeps; import accord.primitives.PartialTxn; import accord.primitives.Range; +import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.Route; import accord.primitives.RoutingKeys; @@ -113,7 +114,7 @@ public class AccordCommandStoreTest TxnId txnId = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write, Routable.Domain.Range); PartialDeps dependencies; - try (PartialDeps.Builder builder = PartialDeps.builder(depTxn.covering())) + try (PartialDeps.Builder builder = PartialDeps.builder(Ranges.of(range))) { builder.add(range, oldTxnId1); builder.add(range, oldTxnId2); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index a9bdd7aa52..70a66f2be4 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -101,7 +101,7 @@ public class AccordCommandTest RoutingKey homeKey = key.toUnseekable(); FullRoute<?> fullRoute = txn.keys().toRoute(homeKey); PartialRoute<?> route = fullRoute.slice(fullRange(txn)); - PartialTxn partialTxn = txn.slice(route.covering(), true); + PartialTxn partialTxn = txn.intersecting(route, true); PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route, 1, 1, false, 1, partialTxn, fullRoute); // Check preaccept @@ -136,7 +136,7 @@ public class AccordCommandTest TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1); PartialDeps deps; - try (PartialDeps.Builder builder = PartialDeps.builder(route.covering())) + try (PartialDeps.Builder builder = PartialDeps.builder(route)) { builder.add(key, txnId2); deps = builder.build(); @@ -193,7 +193,7 @@ public class AccordCommandTest RoutingKey homeKey = key.toUnseekable(); FullRoute<?> fullRoute = txn.keys().toRoute(homeKey); PartialRoute<?> route = fullRoute.slice(fullRange(txn)); - PartialTxn partialTxn = txn.slice(route.covering(), true); + PartialTxn partialTxn = txn.intersecting(route, true); PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1, route, 1, 1, false, 1, partialTxn, fullRoute); getUninterruptibly(commandStore.execute(preAccept1, safeStore -> { diff --git a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java index 2f689187ac..94e181a853 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.Set; import java.util.UUID; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.junit.Assert; @@ -38,6 +37,7 @@ import accord.local.Node; import accord.local.Node.Id; import accord.topology.Shard; import accord.topology.Topology; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.concurrent.ScheduledExecutors; @@ -71,7 +71,7 @@ public class AccordConfigurationServiceTest private static final Id ID1 = new Id(1); private static final Id ID2 = new Id(2); private static final Id ID3 = new Id(3); - private static final List<Id> ID_LIST = ImmutableList.of(ID1, ID2, ID3); + private static final SortedArrayList<Id> ID_LIST = new SortedArrayList<>(new Id[] { ID1, ID2, ID3 }); private static final Set<Id> ID_SET = ImmutableSet.copyOf(ID_LIST); private static final TableId TBL1 = TableId.fromUUID(new UUID(0, 1)); private static final TableId TBL2 = TableId.fromUUID(new UUID(0, 2)); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java index d6d2dab96b..717cda1de3 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java @@ -113,7 +113,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory CommonAttributes.Mutable common = new CommonAttributes.Mutable(id); common.partialTxn(partialTxn); common.route(route); - common.partialDeps(deps.slice(scope)); + common.partialDeps(deps.intersecting(scope)); common.durability(Status.Durability.NotDurable); Command.WaitingOn waitingOn = null; diff --git a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java index 47ba369a09..1ece95f164 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java @@ -88,8 +88,9 @@ public class AccordMessageSinkTest long epoch = 42; Txn txn = Utils.readTxn(Keys.of(IntKey.key(42))); TxnId id = nextTxnId(epoch, txn); - PartialTxn partialTxn = txn.slice(Ranges.of(IntKey.range(40, 50)), true); - Request request = new AbstractFetchCoordinator.FetchRequest(epoch, id, partialTxn.covering(), PartialDeps.NONE, partialTxn); + Ranges ranges = Ranges.of(IntKey.range(40, 50)); + PartialTxn partialTxn = txn.slice(ranges, true); + Request request = new AbstractFetchCoordinator.FetchRequest(epoch, id, ranges, PartialDeps.NONE, partialTxn); checkRequestReplies(request, new AbstractFetchCoordinator.FetchResponse(null, null, id), diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 0f28c6c7c2..ffd0d31186 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -32,8 +32,9 @@ import java.util.stream.IntStream; import javax.annotation.Nullable; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; @@ -157,7 +158,7 @@ public class AccordTestUtils { Seekable key = txn.keys().get(0); RoutingKey routingKey = key.asKey().toUnseekable(); - return new FullKeyRoute(routingKey, true, new RoutingKey[]{ routingKey }); + return new FullKeyRoute(routingKey, new RoutingKey[]{ routingKey }); } } @@ -347,7 +348,7 @@ public class AccordTestUtils { Txn txn = createTxn(key, key); Ranges ranges = fullRange(txn); - return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(), txn.read(), txn.query(), txn.update()); + return new PartialTxn.InMemory(txn.kind(), txn.keys(), txn.read(), txn.query(), txn.update()); } private static class SingleEpochRanges extends CommandStore.EpochUpdateHolder @@ -370,7 +371,7 @@ public class AccordTestUtils TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); TokenRange range = TokenRange.fullRange(metadata.id); Node.Id node = new Id(1); - Topology topology = new Topology(1, new Shard(range, Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet())); + Topology topology = new Topology(1, new Shard(range, new SortedArrayList<>(new Id[] { node }), Sets.newHashSet(node), Collections.emptySet())); NodeTimeService time = new NodeTimeService() { @Override public Id id() { return node;} @@ -437,7 +438,7 @@ public class AccordTestUtils TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); TokenRange range = TokenRange.fullRange(metadata.id); Node.Id node = new Id(1); - Topology topology = new Topology(1, new Shard(range, Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet())); + Topology topology = new Topology(1, new Shard(range, new SortedArrayList<>(new Id[] { node }), Sets.newHashSet(node), Collections.emptySet())); AccordCommandStore store = createAccordCommandStore(node, now, topology, loadExecutor, saveExecutor); store.execute(PreLoadContext.empty(), safeStore -> ((AccordCommandStore)safeStore.commandStore()).setCapacity(1 << 20)); return store; @@ -480,9 +481,9 @@ public class AccordTestUtils return new Node.Id(id); } - public static List<Node.Id> idList(int... ids) + public static SortedArrayList<Id> idList(int... ids) { - return Arrays.stream(ids).mapToObj(AccordTestUtils::id).collect(Collectors.toList()); + return new SortedArrayList<>(Arrays.stream(ids).mapToObj(AccordTestUtils::id).toArray(Id[]::new)); } public static Set<Id> idSet(int... ids) diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java index c3c8e6b588..e4a2a4791c 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java @@ -24,10 +24,10 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import accord.local.Node; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.Range; @@ -53,7 +53,7 @@ public class AccordTopologyUtils public static final Node.Id ID1 = new Node.Id(1); public static final Node.Id ID2 = new Node.Id(2); public static final Node.Id ID3 = new Node.Id(3); - public static final List<Node.Id> NODE_LIST = ImmutableList.of(ID1, ID2, ID3); + public static final SortedArrayList<Node.Id> NODE_LIST = new SortedArrayList<>(new Node.Id[] { ID1, ID2, ID3 }); public static final Set<Node.Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST); private static final IPartitioner partitioner = Murmur3Partitioner.instance; diff --git a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java index 65650274fa..cda86eff8b 100644 --- a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java +++ b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java @@ -19,7 +19,6 @@ package org.apache.cassandra.service.accord.fastpath; import java.util.HashMap; -import java.util.List; import java.util.Map; import com.google.common.collect.ImmutableMap; @@ -27,6 +26,7 @@ import org.junit.Assert; import org.junit.Test; import accord.local.Node; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.accord.fastpath.ParameterizedFastPathStrategy.WeightedDc; @@ -39,7 +39,7 @@ import static org.junit.Assert.assertEquals; public class ParameterizedFastPathStrategyTest { - private static final List<Node.Id> NODES = idList(1, 2, 3, 4, 5, 6); + private static final SortedArrayList<Node.Id> NODES = idList(1, 2, 3, 4, 5, 6); private static final Map<Node.Id, String> DCS_2; private static final Map<Node.Id, String> DCS_3; diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java index d5da34c8fb..d966757e95 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java @@ -93,7 +93,7 @@ public class CheckStatusSerializersTest List<AccordRoutingKey> forOrdering = Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs); forOrdering.sort(Comparator.naturalOrder()); // TODO (coverage): don't hard code keys type - keysOrRanges = new FullKeyRoute(homeKey, forOrdering.contains(homeKey), forOrdering.toArray(RoutingKey[]::new)); + keysOrRanges = new FullKeyRoute(homeKey, forOrdering.toArray(RoutingKey[]::new)); break; case Range: keysOrRanges = AccordGenerators.ranges(Murmur3Partitioner.instance).next(rs); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index 33244f045b..81462c0e65 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -183,7 +183,7 @@ public class CommandsForKeySerializerTest else return Command.SerializerSupport.truncatedApply(attributes(), saveStatus, executeAt, new Writes(txnId, executeAt, txn.keys(), new TxnWrite(Collections.emptyList(), true)), new TxnData()); case Erased: - case ErasedOrInvalidated: + case ErasedOrInvalidOrVestigial: case Invalidated: return Command.SerializerSupport.invalidated(txnId, Listeners.Immutable.EMPTY); } diff --git a/test/unit/org/apache/cassandra/utils/RangeTreeTest.java b/test/unit/org/apache/cassandra/utils/RangeTreeTest.java index 6ef5325d1d..0f0208102c 100644 --- a/test/unit/org/apache/cassandra/utils/RangeTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/RangeTreeTest.java @@ -544,7 +544,7 @@ public class RangeTreeTest { List<Map.Entry<Range, Integer>> matches = new ArrayList<>(); // find ranges, then add the values - list.forEach(range, (a, b, c, d, idx) -> { + list.forEachRange(range, (a, b, c, d, idx) -> { Range match = ranges[idx]; map.get(match).forEachInt(v -> matches.add(Map.entry(match, v))); }, (a, b, c, d, start, end) -> { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
