This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch burn-test-stability in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 12ba2c2eb3bdc6ffd23994ffc94be5e5971af44b Author: Benedict Elliott Smith <[email protected]> AuthorDate: Fri Aug 2 22:45:12 2024 +0100 fix some compilation issues --- modules/accord | 2 +- .../cassandra/db/compaction/CompactionIterator.java | 15 +++++++++------ .../cassandra/service/accord/AccordService.java | 11 +++++++---- .../cassandra/service/accord/AccordTopology.java | 15 +++++++++------ .../cassandra/service/accord/IAccordService.java | 18 ++++++++++++++++-- .../service/accord/fastpath/FastPathStrategy.java | 4 +++- .../fastpath/InheritKeyspaceFastPathStrategy.java | 4 +++- .../fastpath/ParameterizedFastPathStrategy.java | 11 ++++++----- .../accord/fastpath/SimpleFastPathStrategy.java | 20 ++++++++++++-------- .../db/compaction/CompactionAccordIteratorsTest.java | 4 ++-- .../cassandra/service/accord/AccordTestUtils.java | 7 +++++-- .../fastpath/ParameterizedFastPathStrategyTest.java | 4 +++- 12 files changed, 76 insertions(+), 39 deletions(-) diff --git a/modules/accord b/modules/accord index ce6e7a514d..5e80957a6d 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit ce6e7a514de1d4b6ad1c290df423b831664fd535 +Subproject commit 5e80957a6d6c29f993ed9239432fef3d2439983a diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index e0dcb5682f..6e56bd8441 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Ordering; import accord.local.Cleanup; +import accord.local.CommandStores.RangesForEpoch; import accord.local.DurableBefore; import accord.local.RedundantBefore; import accord.local.SaveStatus; @@ -784,6 +785,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte class AccordCommandsPurger extends AbstractPurger { final Int2ObjectHashMap<RedundantBefore> redundantBefores; + final Int2ObjectHashMap<RangesForEpoch> ranges; final DurableBefore durableBefore; int storeId; @@ -791,9 +793,10 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordCommandsPurger(Supplier<IAccordService> accordService) { - Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> redundantBeforesAndDurableBefore = accordService.get().getRedundantBeforesAndDurableBefore(); - this.redundantBefores = redundantBeforesAndDurableBefore.left; - this.durableBefore = redundantBeforesAndDurableBefore.right; + IAccordService.CompactionInfo compactionInfo = accordService.get().getCompactionInfo(); + this.redundantBefores = compactionInfo.redundantBefores; + this.ranges = compactionInfo.ranges; + this.durableBefore = compactionInfo.durableBefore; } protected void beginPartition(UnfilteredRowIterator partition) @@ -815,7 +818,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte // When commands end up being sliced by compaction we need this to discard tombstones and slices // without enough information to run the rest of the cleanup logic - if (Cleanup.isSafeToCleanup(durableBefore, txnId)) + if (Cleanup.isSafeToCleanup(durableBefore, txnId, ranges.get(storeId).allAt(txnId.epoch()))) return null; Cell durabilityCell = row.getCell(CommandsColumns.durability); @@ -878,7 +881,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordTimestampsForKeyPurger(Supplier<IAccordService> accordService) { - this.redundantBefores = accordService.get().getRedundantBeforesAndDurableBefore().left; + this.redundantBefores = accordService.get().getCompactionInfo().redundantBefores; } protected void beginPartition(UnfilteredRowIterator partition) @@ -954,7 +957,7 @@ public class CompactionIterator extends CompactionInfo.Holder implements Unfilte AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor, Supplier<IAccordService> accordService) { this.accessor = accessor; - this.redundantBefores = accordService.get().getRedundantBeforesAndDurableBefore().left; + this.redundantBefores = accordService.get().getCompactionInfo().redundantBefores; } protected void beginPartition(UnfilteredRowIterator partition) diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 752e041f48..75450fff2f 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -45,6 +45,7 @@ import accord.coordinate.Exhausted; import accord.coordinate.FailureAccumulator; import accord.coordinate.TopologyMismatch; import accord.impl.CoordinateDurabilityScheduling; +import accord.local.CommandStores; import accord.primitives.SyncPoint; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.statements.RequestValidations; @@ -226,9 +227,9 @@ public class AccordService implements IAccordService, Shutdownable public void receive(Message<List<AccordSyncPropagator.Notification>> message) {} @Override - public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore() + public CompactionInfo getCompactionInfo() { - return Pair.create(new Int2ObjectHashMap<>(), DurableBefore.EMPTY); + return new CompactionInfo(new Int2ObjectHashMap<>(), new Int2ObjectHashMap<>(), DurableBefore.EMPTY); } }; @@ -773,17 +774,19 @@ public class AccordService implements IAccordService, Shutdownable } @Override - public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore() + public CompactionInfo getCompactionInfo() { Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); + Int2ObjectHashMap<CommandStores.RangesForEpoch>ranges = new Int2ObjectHashMap<>(); AtomicReference<DurableBefore> durableBefore = new AtomicReference<>(DurableBefore.EMPTY); AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> { synchronized (redundantBefores) { redundantBefores.put(safeStore.commandStore().id(), safeStore.commandStore().redundantBefore()); + ranges.put(safeStore.commandStore().id(), safeStore.ranges()); } durableBefore.set(DurableBefore.merge(durableBefore.get(), safeStore.commandStore().durableBefore())); })); - return Pair.create(redundantBefores, durableBefore.get()); + return new CompactionInfo(redundantBefores, ranges, durableBefore.get()); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index 0814c322d5..38830c5631 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -32,6 +32,9 @@ import accord.local.Node; import accord.topology.Shard; import accord.topology.Topology; import accord.utils.Invariants; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; +import accord.utils.SortedList; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.Range; @@ -64,7 +67,7 @@ public class AccordTopology private static class ShardLookup extends HashMap<accord.primitives.Range, Shard> { - private Shard createOrReuse(accord.primitives.Range range, List<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id> joining) + private Shard createOrReuse(accord.primitives.Range range, SortedArrayList<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id> joining) { Shard prev = get(range); if (prev != null @@ -81,10 +84,10 @@ public class AccordTopology { private final KeyspaceMetadata keyspace; private final Range<Token> range; - private final List<Node.Id> nodes; + private final SortedArrayList<Node.Id> nodes; private final Set<Node.Id> pending; - private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range, List<Node.Id> nodes, Set<Node.Id> pending) + private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range, SortedArrayList<Node.Id> nodes, Set<Node.Id> pending) { this.keyspace = keyspace; this.range = range; @@ -106,7 +109,7 @@ public class AccordTopology { TokenRange tokenRange = AccordTopology.range(metadata.id, range); - Set<Node.Id> fastPath = strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap); + SortedArrayList<Node.Id> fastPath = strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap); return lookup.createOrReuse(tokenRange, nodes, fastPath, pending); } @@ -122,10 +125,10 @@ public class AccordTopology Sets.SetView<InetAddressAndPort> readOnly = Sets.difference(readEndpoints, writeEndpoints); Invariants.checkState(readOnly.isEmpty(), "Read only replicas detected: %s", readOnly); - List<Node.Id> nodes = writes.endpoints().stream() + SortedArrayList<Node.Id> nodes = new SortedArrayList<>(writes.endpoints().stream() .map(directory::peerId) .map(AccordTopology::tcmIdToAccord) - .sorted().collect(Collectors.toList()); + .sorted().toArray(Node.Id[]::new)); Set<Node.Id> pending = readEndpoints.equals(writeEndpoints) ? Collections.emptySet() : diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index ae0d08c48e..828ca1f287 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord; import accord.api.BarrierType; +import accord.local.CommandStores; import accord.local.DurableBefore; import accord.local.Node.Id; import accord.local.RedundantBefore; @@ -40,7 +41,6 @@ import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import org.apache.cassandra.service.accord.api.AccordScheduler; import org.apache.cassandra.service.accord.txn.TxnResult; import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Future; import javax.annotation.Nonnull; @@ -108,10 +108,24 @@ public interface IAccordService void receive(Message<List<AccordSyncPropagator.Notification>> message); + class CompactionInfo + { + public final Int2ObjectHashMap<RedundantBefore> redundantBefores; + public final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges; + public final DurableBefore durableBefore; + + public CompactionInfo(Int2ObjectHashMap<RedundantBefore> redundantBefores, Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges, DurableBefore durableBefore) + { + this.redundantBefores = redundantBefores; + this.ranges = ranges; + this.durableBefore = durableBefore; + } + } + /** * Fetch the redundnant befores for every command store */ - Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore> getRedundantBeforesAndDurableBefore(); + CompactionInfo getCompactionInfo(); default Id nodeId() { throw new UnsupportedOperationException(); } } diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java index 6978721a81..419c666f8b 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java @@ -29,6 +29,8 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableMap; import accord.local.Node; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus; @@ -83,7 +85,7 @@ public interface FastPathStrategy * @param dcMap * @return */ - Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap); + SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap); Kind kind(); diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java index 08b7763a93..d7501bd10e 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java @@ -25,6 +25,8 @@ import java.util.Set; import com.google.common.collect.ImmutableMap; import accord.local.Node; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; public class InheritKeyspaceFastPathStrategy implements FastPathStrategy { @@ -35,7 +37,7 @@ public class InheritKeyspaceFastPathStrategy implements FastPathStrategy private InheritKeyspaceFastPathStrategy() {} @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { throw new IllegalStateException("InheritKeyspaceFastPathStrategy should be replaced before calculateFastPath is called"); } diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java index aef562f99a..05e7b49634 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java @@ -38,6 +38,8 @@ import accord.api.VisibleForImplementation; import accord.local.Node; import accord.topology.Shard; import accord.utils.Invariants; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputPlus; @@ -204,7 +206,7 @@ public class ParameterizedFastPathStrategy implements FastPathStrategy } @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { List<NodeSorter> sorters = new ArrayList<>(nodes.size()); @@ -221,12 +223,11 @@ public class ParameterizedFastPathStrategy implements FastPathStrategy int slowQuorum = Shard.slowPathQuorumSize(nodes.size()); int fpSize = Math.max(size, slowQuorum); - ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder(); - + Node.Id[] array = new Node.Id[fpSize]; for (int i=0; i<fpSize; i++) - builder.add(sorters.get(i).id); + array[i] = sorters.get(i).id; - ImmutableSet<Node.Id> fastPath = builder.build(); + SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array); Invariants.checkState(fastPath.size() >= slowQuorum); return fastPath; } diff --git a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java index 3f2e27bdc8..39e32453c9 100644 --- a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java +++ b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java @@ -27,7 +27,10 @@ import com.google.common.collect.ImmutableSet; import accord.local.Node; import accord.topology.Shard; +import accord.utils.ArrayBuffers; import accord.utils.Invariants; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; public class SimpleFastPathStrategy implements FastPathStrategy { @@ -38,26 +41,27 @@ public class SimpleFastPathStrategy implements FastPathStrategy private SimpleFastPathStrategy() {} @Override - public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) + public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap) { int maxFailures = Shard.maxToleratedFailures(nodes.size()); int discarded = 0; - ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder(); + if (unavailable.isEmpty()) + return nodes; + Object[] tmp = ArrayBuffers.cachedAny().get(nodes.size()); for (int i=0,mi=nodes.size(); i<mi; i++) { Node.Id node = nodes.get(i); if (unavailable.contains(node) && discarded < maxFailures) - { discarded++; - continue; - } - - builder.add(node); + else + tmp[i - discarded] = node; } - Set<Node.Id> fastPath = builder.build(); + Node.Id[] array = new Node.Id[nodes.size() - discarded]; + System.arraycopy(tmp, 0, array, 0, nodes.size() - discarded); + SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array); Invariants.checkState(fastPath.size() >= Shard.slowPathQuorumSize(nodes.size())); return fastPath; } diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java index 99e6da378c..871cd584eb 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java @@ -415,7 +415,7 @@ public class CompactionAccordIteratorsTest Int2ObjectHashMap<RedundantBefore> redundantBefores = new Int2ObjectHashMap<>(); if (redundantBefore != null) redundantBefores.put(commandStore.id(), redundantBefore); - when(mockAccordService.getRedundantBeforesAndDurableBefore()).thenReturn(Pair.create(redundantBefores, durableBefore)); + when(mockAccordService.getCompactionInfo()).thenReturn(Pair.create(redundantBefores, durableBefore)); return mockAccordService; } @@ -556,7 +556,7 @@ public class CompactionAccordIteratorsTest scanners.add(random.nextInt(scanners.size()), new Scanner(cfs.metadata(), outputPartitions.stream().map(Partition::unfilteredIterator).collect(Collectors.toList()))); } while (!scanners.isEmpty()); - verify(mockAccordService, times(singleCompaction || numScanners == 1 ? 1 : numScanners - 1)).getRedundantBeforesAndDurableBefore(); + verify(mockAccordService, times(singleCompaction || numScanners == 1 ? 1 : numScanners - 1)).getCompactionInfo(); return result; } } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 0f28c6c7c2..3fe6842bcd 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -34,6 +34,9 @@ import javax.annotation.Nullable; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.util.File; @@ -480,9 +483,9 @@ public class AccordTestUtils return new Node.Id(id); } - public static List<Node.Id> idList(int... ids) + public static SortedArrayList<Id> idList(int... ids) { - return Arrays.stream(ids).mapToObj(AccordTestUtils::id).collect(Collectors.toList()); + return new SortedArrayList<>(Arrays.stream(ids).mapToObj(AccordTestUtils::id).toArray(Id[]::new)); } public static Set<Id> idSet(int... ids) diff --git a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java index 65650274fa..6801bca270 100644 --- a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java +++ b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java @@ -27,6 +27,8 @@ import org.junit.Assert; import org.junit.Test; import accord.local.Node; +import accord.utils.SortedArrays; +import accord.utils.SortedArrays.SortedArrayList; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.accord.fastpath.ParameterizedFastPathStrategy.WeightedDc; @@ -39,7 +41,7 @@ import static org.junit.Assert.assertEquals; public class ParameterizedFastPathStrategyTest { - private static final List<Node.Id> NODES = idList(1, 2, 3, 4, 5, 6); + private static final SortedArrayList<Node.Id> NODES = idList(1, 2, 3, 4, 5, 6); private static final Map<Node.Id, String> DCS_2; private static final Map<Node.Id, String> DCS_3; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
