This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch cassandra-6.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9af2b2cdf8d8c4a3088673b9d67b0e97518c48e8 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Mar 16 15:04:41 2026 +0100 Improve performance deserializing cluster metadata Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-21224 --- CHANGES.txt | 1 + .../apache/cassandra/tcm/membership/Directory.java | 167 +++++++++++++-------- .../microbench/DirectorySerializationBench.java | 126 ++++++++++++++++ 3 files changed, 234 insertions(+), 60 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 86f388f22d..ff76806971 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 6.0-alpha2 + * Improve performance when deserializing cluster metadata (CASSANDRA-21224) * Minor TokenMap performance improvement (CASSANDRA-21223) * Handle lost response when committing PrepareMove (CASSANDRA-21222) * SEPExecutor.maybeExecuteImmediately does not always execute tasks immediately despite available worker capacity (CASSANDRA-21429) diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 8c7942a1fd..55a5e7ac96 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -50,7 +50,6 @@ import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataValue; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.UUIDSerializer; import org.apache.cassandra.utils.btree.BTreeBiMap; import org.apache.cassandra.utils.btree.BTreeMap; @@ -58,6 +57,7 @@ import org.apache.cassandra.utils.btree.BTreeMultimap; import static org.apache.cassandra.db.TypeSizes.sizeof; import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT; +import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION; public class Directory implements MetadataValue<Directory> { @@ -106,6 +106,41 @@ public class Directory implements MetadataValue<Directory> BTreeMap<NodeId, NodeAddresses> addresses, BTreeMultimap<String, InetAddressAndPort> endpointsByDC, BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC) + { + this(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC, clusterVersions(states, versions)); + } + + private Directory(int nextId, + Epoch lastModified, + BTreeBiMap<NodeId, InetAddressAndPort> peers, + BTreeSet<RemovedNode> removedNodes, + BTreeMap<NodeId, Location> locations, + BTreeMap<NodeId, NodeState> states, + BTreeMap<NodeId, NodeVersion> versions, + BTreeBiMap<NodeId, UUID> hostIds, + BTreeMap<NodeId, NodeAddresses> addresses, + BTreeMultimap<String, InetAddressAndPort> endpointsByDC, + BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC, + ClusterVersions clusterVersions) + { + this(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC, + clusterVersions.clusterMinVersion, clusterVersions.clusterMaxVersion, clusterVersions.commonSerializationVersion); + } + + private Directory(int nextId, + Epoch lastModified, + BTreeBiMap<NodeId, InetAddressAndPort> peers, + BTreeSet<RemovedNode> removedNodes, + BTreeMap<NodeId, Location> locations, + BTreeMap<NodeId, NodeState> states, + BTreeMap<NodeId, NodeVersion> versions, + BTreeBiMap<NodeId, UUID> hostIds, + BTreeMap<NodeId, NodeAddresses> addresses, + BTreeMultimap<String, InetAddressAndPort> endpointsByDC, + BTreeMap<String, Multimap<String, InetAddressAndPort>> racksByDC, + NodeVersion clusterMinVersion, + NodeVersion clusterMaxVersion, + Version commonSerializationVersion) { this.nextId = nextId; this.lastModified = lastModified; @@ -118,10 +153,9 @@ public class Directory implements MetadataValue<Directory> this.addresses = addresses; this.endpointsByDC = endpointsByDC; this.racksByDC = racksByDC; - Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states, versions); - clusterMinVersion = minMaxVer.left; - clusterMaxVersion = minMaxVer.right; - commonSerializationVersion = minCommonSerializationVersion(states, versions); + this.clusterMinVersion = clusterMinVersion; + this.clusterMaxVersion = clusterMaxVersion; + this.commonSerializationVersion = commonSerializationVersion; } @Override @@ -161,7 +195,7 @@ public class Directory implements MetadataValue<Directory> @Override public Directory withLastModified(Epoch epoch) { - return new Directory(nextId, epoch, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC); + return new Directory(nextId, epoch, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC, racksByDC, clusterMinVersion, clusterMaxVersion, commonSerializationVersion); } public Directory withNonUpgradedNode(NodeAddresses addresses, @@ -250,9 +284,18 @@ public class Directory implements MetadataValue<Directory> BTreeMap<String, Multimap<String, InetAddressAndPort>> updatedEndpointsByRack = racksByDC.withForce(location(id).datacenter, rackEP); return new Directory(nextId, lastModified, - peers.withForce(id,nodeAddresses.broadcastAddress), removedNodes, locations, states, versions, hostIds, addresses.withForce(id, nodeAddresses), + peers.withForce(id, nodeAddresses.broadcastAddress), + removedNodes, + locations, + states, + versions, + hostIds, + addresses.withForce(id, nodeAddresses), updatedEndpointsByDC, - updatedEndpointsByRack); + updatedEndpointsByRack, + clusterMinVersion, + clusterMaxVersion, + commonSerializationVersion); } public Directory withRackAndDC(NodeId id) @@ -266,7 +309,10 @@ public class Directory implements MetadataValue<Directory> return new Directory(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC.with(location.datacenter, endpoint), - racksByDC.withForce(location.datacenter, rackEP)); + racksByDC.withForce(location.datacenter, rackEP), + clusterMinVersion, + clusterMaxVersion, + commonSerializationVersion); } public Directory withoutRackAndDC(NodeId id) @@ -286,7 +332,10 @@ public class Directory implements MetadataValue<Directory> newRacksByDC = racksByDC.withForce(location.datacenter, rackEP); return new Directory(nextId, lastModified, peers, removedNodes, locations, states, versions, hostIds, addresses, endpointsByDC.without(location.datacenter, endpoint), - newRacksByDC); + newRacksByDC, + clusterMinVersion, + clusterMaxVersion, + commonSerializationVersion); } public Directory withUpdatedRackAndDc(NodeId id, Location location) @@ -306,23 +355,7 @@ public class Directory implements MetadataValue<Directory> return this; return new Directory(nextId, lastModified, peers, removedNodes, locations.withForce(id, location), states, versions, hostIds, - addresses, endpointsByDC, racksByDC); - } - - public Directory removed(Epoch removedIn, NodeId id, InetAddressAndPort addr) - { - Invariants.require(!peers.containsKey(id)); - return new Directory(nextId, - lastModified, - peers, - removedNodes.with(new RemovedNode(removedIn, id, addr)), - locations, - states, - versions, - hostIds, - addresses, - endpointsByDC, - racksByDC); + addresses, endpointsByDC, racksByDC, clusterMinVersion, clusterMaxVersion, commonSerializationVersion); } public Directory without(Epoch removedIn, NodeId id) @@ -641,14 +674,23 @@ public class Directory implements MetadataValue<Directory> if (version.isAtLeast(Version.V1)) nextId = in.readInt(); int count = in.readInt(); - Directory newDir = new Directory(); + BTreeBiMap<NodeId, InetAddressAndPort> peers = BTreeBiMap.empty(); + BTreeMap<NodeId, Location> locations = BTreeMap.empty(); + BTreeMap<NodeId, NodeState> states = BTreeMap.empty(); + BTreeMap<NodeId, NodeVersion> versions = BTreeMap.empty(); + BTreeBiMap<NodeId, UUID> hostIds = BTreeBiMap.empty(); + BTreeMap<NodeId, NodeAddresses> addresses = BTreeMap.empty(); for (int i = 0; i < count; i++) { Node n = Node.serializer.deserialize(in, version); - // todo: bulk operations - newDir = newDir.with(n.addresses, n.id, n.hostId, n.location, n.version) - .withNodeState(n.id, n.state); + NodeId id = n.id; + peers = peers.withForce(id, n.addresses.broadcastAddress); + locations = locations.withForce(id, n.location); + states = states.withForce(id, n.state); + versions = versions.withForce(id, n.version); + hostIds = hostIds.withForce(id, n.hostId); + addresses = addresses.withForce(id, n.addresses); } int dcCount = in.readInt(); @@ -677,7 +719,7 @@ public class Directory implements MetadataValue<Directory> if (version.isBefore(Version.V1)) { NodeId maxId = null; - for (NodeId id : newDir.peers.keySet()) + for (NodeId id : peers.keySet()) { if (maxId == null || id.compareTo(maxId) > 0) maxId = id; @@ -688,7 +730,7 @@ public class Directory implements MetadataValue<Directory> else nextId = maxId.id() + 1; } - + BTreeSet<RemovedNode> removed = BTreeSet.empty(RemovedNode::compareTo); if (version.isAtLeast(Version.V7)) { int removedNodes = in.readInt(); @@ -697,19 +739,20 @@ public class Directory implements MetadataValue<Directory> long epoch = in.readLong(); NodeId nodeId = NodeId.serializer.deserialize(in, version); InetAddressAndPort addr = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); - newDir.removed(Epoch.create(epoch), nodeId, addr); + Invariants.require(!peers.containsKey(nodeId)); + removed = removed.with(new RemovedNode(Epoch.create(epoch), nodeId, addr)); } } return new Directory(nextId, lastModified, - newDir.peers, - newDir.removedNodes, - newDir.locations, - newDir.states, - newDir.versions, - newDir.hostIds, - newDir.addresses, + peers, + removed, + locations, + states, + versions, + hostIds, + addresses, dcEndpoints, racksByDC); } @@ -769,10 +812,11 @@ public class Directory implements MetadataValue<Directory> equivalentTo(directory); } - private static Pair<NodeVersion, NodeVersion> minMaxVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions) + private static ClusterVersions clusterVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions) { NodeVersion minVersion = null; NodeVersion maxVersion = null; + int commonVersion = Integer.MAX_VALUE; for (Map.Entry<NodeId, NodeState> entry : states.entrySet()) { if (entry.getValue() != NodeState.LEFT) @@ -782,26 +826,15 @@ public class Directory implements MetadataValue<Directory> minVersion = ver; if (maxVersion == null || ver.compareTo(maxVersion) > 0) maxVersion = ver; - } - } - if (minVersion == null) - return Pair.create(CURRENT, CURRENT); - return Pair.create(minVersion, maxVersion); - } - - public static Version minCommonSerializationVersion(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, NodeVersion> versions) - { - int commonVersion = Integer.MAX_VALUE; - for (Map.Entry<NodeId, NodeState> entry : states.entrySet()) - { - if (entry.getValue() != NodeState.LEFT) - { - NodeVersion ver = versions.get(entry.getKey()); if (ver.serializationVersion > Version.OLD.asInt() && ver.serializationVersion < commonVersion) commonVersion = ver.serializationVersion; } } - return commonVersion == Integer.MAX_VALUE ? NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion); + if (minVersion == null) + return new ClusterVersions(CURRENT, CURRENT, CURRENT_METADATA_VERSION); + + return new ClusterVersions(minVersion, maxVersion, + commonVersion == Integer.MAX_VALUE ? NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion)); } @Override @@ -825,7 +858,8 @@ public class Directory implements MetadataValue<Directory> Objects.equals(endpointsByDC, directory.endpointsByDC) && Objects.equals(racksByDC, directory.racksByDC) && Objects.equals(versions, directory.versions) && - Objects.equals(addresses, directory.addresses); + Objects.equals(addresses, directory.addresses) && + Objects.equals(removedNodes, directory.removedNodes); } private static final Logger logger = LoggerFactory.getLogger(Directory.class); @@ -891,7 +925,6 @@ public class Directory implements MetadataValue<Directory> } - public static class RemovedNode implements Comparable<RemovedNode> { public final Epoch removedIn; @@ -923,4 +956,18 @@ public class Directory implements MetadataValue<Directory> return id.compareTo(o.id); } } + + private static class ClusterVersions + { + private final NodeVersion clusterMinVersion; + private final NodeVersion clusterMaxVersion; + private final Version commonSerializationVersion; + public ClusterVersions(NodeVersion clusterMinVersion, NodeVersion clusterMaxVersion, Version commonSerializationVersion) + { + + this.clusterMinVersion = clusterMinVersion; + this.clusterMaxVersion = clusterMaxVersion; + this.commonSerializationVersion = commonSerializationVersion; + } + } } diff --git a/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java b/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java new file mode 100644 index 0000000000..5b9bf7bd69 --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.RegistrationStatus; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.PlacementProvider; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.transformations.UnsafeJoin; +import org.apache.cassandra.utils.CassandraVersion; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 30000) +public class DirectorySerializationBench +{ + static Random random = new Random(1); + static ClusterMetadata metadata; + static byte[] serialized; + @Setup(Level.Trial) + public void setup() throws IOException + { + DatabaseDescriptor.daemonInitialization(); + int nodecount = 4000; + metadata = fakeMetadata(nodecount, 3, 3); + RegistrationStatus.instance.onRegistration(); + DataOutputBuffer buf = new DataOutputBuffer(2_000_000); + ClusterMetadata.serializer.serialize(metadata, buf, NodeVersion.CURRENT_METADATA_VERSION); + serialized = buf.toByteArray(); + } + + @Benchmark + public void bench() throws IOException + { + ClusterMetadata.serializer.deserialize(new DataInputBuffer(serialized), NodeVersion.CURRENT_METADATA_VERSION); + } + + public static ClusterMetadata fakeMetadata(int nodeCount, int dcCount, int rackCount) throws UnknownHostException + { + ClusterMetadata metadata = new ClusterMetadata(Murmur3Partitioner.instance); + TokenSupplier tokensupplier = TokenSupplier.evenlyDistributedTokens(nodeCount); + PlacementProvider placementProvider = new UniformRangePlacement(); + NodeVersion nodeVersion = new NodeVersion(new CassandraVersion("6.0.0"), NodeVersion.CURRENT_METADATA_VERSION); + for (int i = 1; i < nodeCount; i++) + { + ClusterMetadata.Transformer transformer = metadata.transformer(); + UUID uuid = UUID.randomUUID(); + NodeAddresses addresses = addresses(uuid, i); + metadata = transformer.register(addresses, new Location("dc" + random.nextInt(dcCount), "rack"+random.nextInt(rackCount)), nodeVersion).build().metadata; + NodeId nodeId = metadata.directory.peerId(addresses.broadcastAddress); + metadata = new UnsafeJoin(nodeId, Collections.singleton(new Murmur3Partitioner.LongToken(tokensupplier.token(i))), placementProvider).execute(metadata).success().metadata; + } + + return metadata; + } + + static NodeAddresses addresses(UUID uuid, int idx) throws UnknownHostException + { + byte [] address = new byte [] {127, 0, + (byte) (((idx + 1) & 0x0000ff00) >> 8), + (byte) ((idx + 1) & 0x000000ff)}; + + InetAddressAndPort host = InetAddressAndPort.getByAddress(address); + return new NodeAddresses(uuid, host, host, host); + } + + public static void main(String[] args) throws RunnerException, UnknownHostException + { + Options options = new OptionsBuilder() + .include(DirectorySerializationBench.class.getSimpleName()) + .build(); + new Runner(options).run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
