This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch cassandra-6.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9af2b2cdf8d8c4a3088673b9d67b0e97518c48e8
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Mar 16 15:04:41 2026 +0100

    Improve performance deserializing cluster metadata
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-21224
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/tcm/membership/Directory.java | 167 +++++++++++++--------
 .../microbench/DirectorySerializationBench.java    | 126 ++++++++++++++++
 3 files changed, 234 insertions(+), 60 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 86f388f22d..ff76806971 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 6.0-alpha2
+ * Improve performance when deserializing cluster metadata  (CASSANDRA-21224)
  * Minor TokenMap performance improvement (CASSANDRA-21223)
  * Handle lost response when committing PrepareMove (CASSANDRA-21222)
  * SEPExecutor.maybeExecuteImmediately does not always execute tasks 
immediately despite available worker capacity (CASSANDRA-21429)
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 8c7942a1fd..55a5e7ac96 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -50,7 +50,6 @@ import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.MetadataValue;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
 import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.UUIDSerializer;
 import org.apache.cassandra.utils.btree.BTreeBiMap;
 import org.apache.cassandra.utils.btree.BTreeMap;
@@ -58,6 +57,7 @@ import org.apache.cassandra.utils.btree.BTreeMultimap;
 
 import static org.apache.cassandra.db.TypeSizes.sizeof;
 import static org.apache.cassandra.tcm.membership.NodeVersion.CURRENT;
+import static 
org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION;
 
 public class Directory implements MetadataValue<Directory>
 {
@@ -106,6 +106,41 @@ public class Directory implements MetadataValue<Directory>
                       BTreeMap<NodeId, NodeAddresses> addresses,
                       BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
                       BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC)
+    {
+        this(nextId, lastModified, peers, removedNodes, locations, states, 
versions, hostIds, addresses, endpointsByDC, racksByDC, clusterVersions(states, 
versions));
+    }
+
+    private Directory(int nextId,
+                      Epoch lastModified,
+                      BTreeBiMap<NodeId, InetAddressAndPort> peers,
+                      BTreeSet<RemovedNode> removedNodes,
+                      BTreeMap<NodeId, Location> locations,
+                      BTreeMap<NodeId, NodeState> states,
+                      BTreeMap<NodeId, NodeVersion> versions,
+                      BTreeBiMap<NodeId, UUID> hostIds,
+                      BTreeMap<NodeId, NodeAddresses> addresses,
+                      BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
+                      BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC,
+                      ClusterVersions clusterVersions)
+    {
+        this(nextId, lastModified, peers, removedNodes, locations, states, 
versions, hostIds, addresses, endpointsByDC, racksByDC,
+             clusterVersions.clusterMinVersion, 
clusterVersions.clusterMaxVersion, clusterVersions.commonSerializationVersion);
+    }
+
+    private Directory(int nextId,
+                      Epoch lastModified,
+                      BTreeBiMap<NodeId, InetAddressAndPort> peers,
+                      BTreeSet<RemovedNode> removedNodes,
+                      BTreeMap<NodeId, Location> locations,
+                      BTreeMap<NodeId, NodeState> states,
+                      BTreeMap<NodeId, NodeVersion> versions,
+                      BTreeBiMap<NodeId, UUID> hostIds,
+                      BTreeMap<NodeId, NodeAddresses> addresses,
+                      BTreeMultimap<String, InetAddressAndPort> endpointsByDC,
+                      BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC,
+                      NodeVersion clusterMinVersion,
+                      NodeVersion clusterMaxVersion,
+                      Version commonSerializationVersion)
     {
         this.nextId = nextId;
         this.lastModified = lastModified;
@@ -118,10 +153,9 @@ public class Directory implements MetadataValue<Directory>
         this.addresses = addresses;
         this.endpointsByDC = endpointsByDC;
         this.racksByDC = racksByDC;
-        Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states, 
versions);
-        clusterMinVersion = minMaxVer.left;
-        clusterMaxVersion = minMaxVer.right;
-        commonSerializationVersion = minCommonSerializationVersion(states, 
versions);
+        this.clusterMinVersion = clusterMinVersion;
+        this.clusterMaxVersion = clusterMaxVersion;
+        this.commonSerializationVersion = commonSerializationVersion;
     }
 
     @Override
@@ -161,7 +195,7 @@ public class Directory implements MetadataValue<Directory>
     @Override
     public Directory withLastModified(Epoch epoch)
     {
-        return new Directory(nextId, epoch, peers, removedNodes, locations, 
states, versions, hostIds, addresses, endpointsByDC, racksByDC);
+        return new Directory(nextId, epoch, peers, removedNodes, locations, 
states, versions, hostIds, addresses, endpointsByDC, racksByDC, 
clusterMinVersion, clusterMaxVersion, commonSerializationVersion);
     }
 
     public Directory withNonUpgradedNode(NodeAddresses addresses,
@@ -250,9 +284,18 @@ public class Directory implements MetadataValue<Directory>
         BTreeMap<String, Multimap<String, InetAddressAndPort>> 
updatedEndpointsByRack = racksByDC.withForce(location(id).datacenter, rackEP);
 
         return new Directory(nextId, lastModified,
-                             
peers.withForce(id,nodeAddresses.broadcastAddress), removedNodes, locations, 
states, versions, hostIds, addresses.withForce(id, nodeAddresses),
+                             peers.withForce(id, 
nodeAddresses.broadcastAddress),
+                             removedNodes,
+                             locations,
+                             states,
+                             versions,
+                             hostIds,
+                             addresses.withForce(id, nodeAddresses),
                              updatedEndpointsByDC,
-                             updatedEndpointsByRack);
+                             updatedEndpointsByRack,
+                             clusterMinVersion,
+                             clusterMaxVersion,
+                             commonSerializationVersion);
     }
 
     public Directory withRackAndDC(NodeId id)
@@ -266,7 +309,10 @@ public class Directory implements MetadataValue<Directory>
 
         return new Directory(nextId, lastModified, peers, removedNodes, 
locations, states, versions, hostIds, addresses,
                              endpointsByDC.with(location.datacenter, endpoint),
-                             racksByDC.withForce(location.datacenter, rackEP));
+                             racksByDC.withForce(location.datacenter, rackEP),
+                             clusterMinVersion,
+                             clusterMaxVersion,
+                             commonSerializationVersion);
     }
 
     public Directory withoutRackAndDC(NodeId id)
@@ -286,7 +332,10 @@ public class Directory implements MetadataValue<Directory>
             newRacksByDC = racksByDC.withForce(location.datacenter, rackEP);
         return new Directory(nextId, lastModified, peers, removedNodes, 
locations, states, versions, hostIds, addresses,
                              endpointsByDC.without(location.datacenter, 
endpoint),
-                             newRacksByDC);
+                             newRacksByDC,
+                             clusterMinVersion,
+                             clusterMaxVersion,
+                             commonSerializationVersion);
     }
 
     public Directory withUpdatedRackAndDc(NodeId id, Location location)
@@ -306,23 +355,7 @@ public class Directory implements MetadataValue<Directory>
             return this;
 
         return new Directory(nextId, lastModified, peers, removedNodes, 
locations.withForce(id, location), states, versions, hostIds,
-                             addresses, endpointsByDC, racksByDC);
-    }
-
-    public Directory removed(Epoch removedIn, NodeId id, InetAddressAndPort 
addr)
-    {
-        Invariants.require(!peers.containsKey(id));
-        return new Directory(nextId,
-                             lastModified,
-                             peers,
-                             removedNodes.with(new RemovedNode(removedIn, id, 
addr)),
-                             locations,
-                             states,
-                             versions,
-                             hostIds,
-                             addresses,
-                             endpointsByDC,
-                             racksByDC);
+                             addresses, endpointsByDC, racksByDC, 
clusterMinVersion, clusterMaxVersion, commonSerializationVersion);
     }
 
     public Directory without(Epoch removedIn, NodeId id)
@@ -641,14 +674,23 @@ public class Directory implements MetadataValue<Directory>
             if (version.isAtLeast(Version.V1))
                 nextId = in.readInt();
             int count = in.readInt();
-            Directory newDir = new Directory();
 
+            BTreeBiMap<NodeId, InetAddressAndPort> peers = BTreeBiMap.empty();
+            BTreeMap<NodeId, Location> locations = BTreeMap.empty();
+            BTreeMap<NodeId, NodeState> states = BTreeMap.empty();
+            BTreeMap<NodeId, NodeVersion> versions = BTreeMap.empty();
+            BTreeBiMap<NodeId, UUID> hostIds = BTreeBiMap.empty();
+            BTreeMap<NodeId, NodeAddresses> addresses = BTreeMap.empty();
             for (int i = 0; i < count; i++)
             {
                 Node n = Node.serializer.deserialize(in, version);
-                // todo: bulk operations
-                newDir = newDir.with(n.addresses, n.id, n.hostId, n.location, 
n.version)
-                               .withNodeState(n.id, n.state);
+                NodeId id = n.id;
+                peers = peers.withForce(id, n.addresses.broadcastAddress);
+                locations = locations.withForce(id, n.location);
+                states = states.withForce(id, n.state);
+                versions = versions.withForce(id, n.version);
+                hostIds = hostIds.withForce(id, n.hostId);
+                addresses = addresses.withForce(id, n.addresses);
             }
 
             int dcCount = in.readInt();
@@ -677,7 +719,7 @@ public class Directory implements MetadataValue<Directory>
             if (version.isBefore(Version.V1))
             {
                 NodeId maxId = null;
-                for (NodeId id : newDir.peers.keySet())
+                for (NodeId id : peers.keySet())
                 {
                     if (maxId == null || id.compareTo(maxId) > 0)
                         maxId = id;
@@ -688,7 +730,7 @@ public class Directory implements MetadataValue<Directory>
                 else
                     nextId = maxId.id() + 1;
             }
-
+            BTreeSet<RemovedNode> removed = 
BTreeSet.empty(RemovedNode::compareTo);
             if (version.isAtLeast(Version.V7))
             {
                 int removedNodes = in.readInt();
@@ -697,19 +739,20 @@ public class Directory implements MetadataValue<Directory>
                     long epoch = in.readLong();
                     NodeId nodeId = NodeId.serializer.deserialize(in, version);
                     InetAddressAndPort addr = 
InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version);
-                    newDir.removed(Epoch.create(epoch), nodeId, addr);
+                    Invariants.require(!peers.containsKey(nodeId));
+                    removed = removed.with(new 
RemovedNode(Epoch.create(epoch), nodeId, addr));
                 }
             }
 
             return new Directory(nextId,
                                  lastModified,
-                                 newDir.peers,
-                                 newDir.removedNodes,
-                                 newDir.locations,
-                                 newDir.states,
-                                 newDir.versions,
-                                 newDir.hostIds,
-                                 newDir.addresses,
+                                 peers,
+                                 removed,
+                                 locations,
+                                 states,
+                                 versions,
+                                 hostIds,
+                                 addresses,
                                  dcEndpoints,
                                  racksByDC);
         }
@@ -769,10 +812,11 @@ public class Directory implements MetadataValue<Directory>
                equivalentTo(directory);
     }
 
-    private static Pair<NodeVersion, NodeVersion> 
minMaxVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId, 
NodeVersion> versions)
+    private static ClusterVersions clusterVersions(BTreeMap<NodeId, NodeState> 
states, BTreeMap<NodeId, NodeVersion> versions)
     {
         NodeVersion minVersion = null;
         NodeVersion maxVersion = null;
+        int commonVersion = Integer.MAX_VALUE;
         for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
         {
             if (entry.getValue() != NodeState.LEFT)
@@ -782,26 +826,15 @@ public class Directory implements MetadataValue<Directory>
                     minVersion = ver;
                 if (maxVersion == null || ver.compareTo(maxVersion) > 0)
                     maxVersion = ver;
-            }
-        }
-        if (minVersion == null)
-            return Pair.create(CURRENT, CURRENT);
-        return Pair.create(minVersion, maxVersion);
-    }
-
-    public static Version minCommonSerializationVersion(BTreeMap<NodeId, 
NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
-    {
-        int commonVersion = Integer.MAX_VALUE;
-        for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
-        {
-            if (entry.getValue() != NodeState.LEFT)
-            {
-                NodeVersion ver = versions.get(entry.getKey());
                 if (ver.serializationVersion > Version.OLD.asInt() && 
ver.serializationVersion < commonVersion)
                     commonVersion = ver.serializationVersion;
             }
         }
-        return commonVersion == Integer.MAX_VALUE ? 
NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion);
+        if (minVersion == null)
+            return new ClusterVersions(CURRENT, CURRENT, 
CURRENT_METADATA_VERSION);
+
+        return new ClusterVersions(minVersion, maxVersion,
+                                   commonVersion == Integer.MAX_VALUE ? 
NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion));
     }
 
     @Override
@@ -825,7 +858,8 @@ public class Directory implements MetadataValue<Directory>
                Objects.equals(endpointsByDC, directory.endpointsByDC) &&
                Objects.equals(racksByDC, directory.racksByDC) &&
                Objects.equals(versions, directory.versions) &&
-               Objects.equals(addresses, directory.addresses);
+               Objects.equals(addresses, directory.addresses) &&
+               Objects.equals(removedNodes, directory.removedNodes);
     }
     
     private static final Logger logger = 
LoggerFactory.getLogger(Directory.class);
@@ -891,7 +925,6 @@ public class Directory implements MetadataValue<Directory>
 
     }
 
-
     public static class RemovedNode implements Comparable<RemovedNode>
     {
         public final Epoch removedIn;
@@ -923,4 +956,18 @@ public class Directory implements MetadataValue<Directory>
             return id.compareTo(o.id);
         }
     }
+
+    private static class ClusterVersions
+    {
+        private final NodeVersion clusterMinVersion;
+        private final NodeVersion clusterMaxVersion;
+        private final Version commonSerializationVersion;
+        public ClusterVersions(NodeVersion clusterMinVersion, NodeVersion 
clusterMaxVersion, Version commonSerializationVersion)
+        {
+
+            this.clusterMinVersion = clusterMinVersion;
+            this.clusterMaxVersion = clusterMaxVersion;
+            this.commonSerializationVersion = commonSerializationVersion;
+        }
+    }
 }
diff --git 
a/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java
 
b/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java
new file mode 100644
index 0000000000..5b9bf7bd69
--- /dev/null
+++ 
b/test/microbench/org/apache/cassandra/test/microbench/DirectorySerializationBench.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.test.microbench;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.RegistrationStatus;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
+import org.apache.cassandra.tcm.transformations.UnsafeJoin;
+import org.apache.cassandra.utils.CassandraVersion;
+
+@State(Scope.Benchmark)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(value = 1)
+@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000)
+@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 30000)
+public class DirectorySerializationBench
+{
+    static Random random = new Random(1);
+    static ClusterMetadata metadata;
+    static byte[] serialized;
+    @Setup(Level.Trial)
+    public void setup() throws IOException
+    {
+        DatabaseDescriptor.daemonInitialization();
+        int nodecount = 4000;
+        metadata = fakeMetadata(nodecount, 3, 3);
+        RegistrationStatus.instance.onRegistration();
+        DataOutputBuffer buf = new DataOutputBuffer(2_000_000);
+        ClusterMetadata.serializer.serialize(metadata, buf, 
NodeVersion.CURRENT_METADATA_VERSION);
+        serialized = buf.toByteArray();
+    }
+
+    @Benchmark
+    public void bench() throws IOException
+    {
+        ClusterMetadata.serializer.deserialize(new 
DataInputBuffer(serialized), NodeVersion.CURRENT_METADATA_VERSION);
+    }
+
+    public static ClusterMetadata fakeMetadata(int nodeCount, int dcCount, int 
rackCount) throws UnknownHostException
+    {
+        ClusterMetadata metadata = new 
ClusterMetadata(Murmur3Partitioner.instance);
+        TokenSupplier tokensupplier = 
TokenSupplier.evenlyDistributedTokens(nodeCount);
+        PlacementProvider placementProvider = new UniformRangePlacement();
+        NodeVersion nodeVersion = new NodeVersion(new 
CassandraVersion("6.0.0"), NodeVersion.CURRENT_METADATA_VERSION);
+        for (int i = 1; i < nodeCount; i++)
+        {
+            ClusterMetadata.Transformer transformer = metadata.transformer();
+            UUID uuid = UUID.randomUUID();
+            NodeAddresses addresses = addresses(uuid, i);
+            metadata = transformer.register(addresses, new Location("dc" + 
random.nextInt(dcCount), "rack"+random.nextInt(rackCount)), 
nodeVersion).build().metadata;
+            NodeId nodeId = 
metadata.directory.peerId(addresses.broadcastAddress);
+            metadata = new UnsafeJoin(nodeId, Collections.singleton(new 
Murmur3Partitioner.LongToken(tokensupplier.token(i))), 
placementProvider).execute(metadata).success().metadata;
+        }
+
+        return metadata;
+    }
+
+    static NodeAddresses addresses(UUID uuid, int idx) throws 
UnknownHostException
+    {
+        byte [] address = new byte [] {127, 0,
+                                       (byte) (((idx + 1) & 0x0000ff00) >> 8),
+                                       (byte) ((idx + 1) & 0x000000ff)};
+
+        InetAddressAndPort host = InetAddressAndPort.getByAddress(address);
+        return new NodeAddresses(uuid, host, host, host);
+    }
+
+    public static void main(String[] args) throws RunnerException, 
UnknownHostException
+    {
+        Options options = new OptionsBuilder()
+                          
.include(DirectorySerializationBench.class.getSimpleName())
+                          .build();
+        new Runner(options).run();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to