This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 415eaffb9cc50df5a12a330e14027ef60c07ee02
Author: Marcus Eriksson <marc...@apache.org>
AuthorDate: Fri Jan 31 10:02:49 2025 +0100

    Reduce heap pressure when initializing CMS
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20267
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   3 +-
 src/java/org/apache/cassandra/net/Verb.java        |   8 +-
 .../apache/cassandra/schema/SchemaKeyspace.java    |  28 ++++
 .../cassandra/tcm/ClusterMetadataService.java      |   2 +-
 src/java/org/apache/cassandra/tcm/Startup.java     |   7 +-
 .../tcm/migration/CMSInitializationRequest.java    | 169 +++++++++++++++++++++
 .../tcm/migration/CMSInitializationResponse.java   |  74 +++++++++
 .../tcm/migration/ClusterMetadataHolder.java       |  95 ------------
 .../apache/cassandra/tcm/migration/Election.java   | 160 +++++++------------
 .../tcm/serialization/MessageSerializers.java      |   8 +-
 .../ClusterMetadataUpgradeIgnoreHostsTest.java     |  50 ------
 .../upgrade/ClusterMetadataUpgradeTest.java        |  36 +++++
 13 files changed, 379 insertions(+), 262 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 630a13c373..685d86119b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Reduce heap pressure when initializing CMS (CASSANDRA-20267)
  * Paxos Repair: NoSuchElementException on 
DistributedSchema.getKeyspaceMetadata (CASSANDRA-20320)
  * Improve performance of DistributedSchema.validate for large schemas 
(CASSANDRA-20360)
  * Add JSON constraint (CASSANDRA-20273)
diff --git a/NEWS.txt b/NEWS.txt
index bb8898d53b..b35a3c0274 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -149,7 +149,8 @@ Upgrading
     However, nodes still UP and running the old version will. This will 
eventually cause the migration to fail, as the
     cluster will not be in agreement.
       - > nodetool cms initialize
-        Got mismatching cluster metadatas from [/x.x.x.x:7000] aborting 
migration
+        Got mismatching cluster metadatas. Check logs on peers 
([/x.x.x.x:7000]) for details of mismatches.
+        Aborting migration.
         See 'nodetool help' or 'nodetool help <command>'.
     If the cms initialize command fails, it will indicate which nodes’ current 
metadata does not agree with the node
     where the command was executed. To mitigate this situation, bring any 
mismatching nodes DOWN and rerun the
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 17c4550fc0..c2cce663ef 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -94,7 +94,9 @@ import org.apache.cassandra.tcm.Discovery;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.FetchCMSLog;
 import org.apache.cassandra.tcm.FetchPeerLog;
+import org.apache.cassandra.tcm.migration.CMSInitializationResponse;
 import org.apache.cassandra.tcm.migration.Election;
+import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
 import org.apache.cassandra.tcm.sequences.DataMovements;
 import org.apache.cassandra.tcm.serialization.MessageSerializers;
 import org.apache.cassandra.utils.BooleanSerializer;
@@ -232,9 +234,9 @@ public enum Verb
     TCM_NOTIFY_RSP         (806, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
ResponseVerbHandler.instance                                 ),
     TCM_NOTIFY_REQ         (807, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::logStateSerializer,             () -> logNotifyHandler(),   
                TCM_NOTIFY_RSP         ),
     TCM_CURRENT_EPOCH_REQ  (808, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Epoch.messageSerializer,                      () -> 
currentEpochRequestHandler(),         TCM_NOTIFY_RSP         ),
-    TCM_INIT_MIG_RSP       (809, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::metadataHolderSerializer,       () -> 
ResponseVerbHandler.instance                                 ),
-    TCM_INIT_MIG_REQ       (810, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.prepareHandler,     TCM_INIT_MIG_RSP       ),
-    TCM_ABORT_MIG          (811, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Election.Initiator.serializer,                () -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP       ),
+    TCM_INIT_MIG_RSP       (809, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> CMSInitializationResponse.serializer,         () -> 
ResponseVerbHandler.instance                                 ),
+    TCM_INIT_MIG_REQ       (810, P0, rpcTimeout,      INTERNAL_METADATA,    
MessageSerializers::initRequestSerializer,          () -> 
Election.instance.prepareHandler,     TCM_INIT_MIG_RSP       ),
+    TCM_ABORT_MIG          (811, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> CMSInitializationRequest.Initiator.serializer,() -> 
Election.instance.abortHandler,       TCM_INIT_MIG_RSP       ),
     TCM_DISCOVER_RSP       (812, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> Discovery.serializer,                         () -> 
ResponseVerbHandler.instance                                 ),
     TCM_DISCOVER_REQ       (813, P0, rpcTimeout,      INTERNAL_METADATA,    () 
-> NoPayload.serializer,                         () -> 
Discovery.instance.requestHandler,    TCM_DISCOVER_RSP       ),
     TCM_FETCH_PEER_LOG_RSP (818, P0, rpcTimeout,      FETCH_LOG,            
MessageSerializers::logStateSerializer,             () -> 
ResponseVerbHandler.instance                                 ),
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index a6877cc631..b3ed1c2702 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -364,6 +364,34 @@ public final class SchemaKeyspace
             ALL.forEach(table -> 
FBUtilities.waitOnFuture(getSchemaCFS(table).forceFlush(ColumnFamilyStore.FlushReason.INTERNALLY_FORCED)));
     }
 
+    /**
+     * Read schema from system keyspace and calculate MD5 digest of every row, 
resulting digest
+     * will be converted into UUID which would act as content-based version of 
the schema.
+     *
+     * Only used when initializing the CMS
+     */
+    public static UUID calculateSchemaDigest()
+    {
+        Digest digest = Digest.forSchema();
+        for (String table : ALL)
+        {
+            ReadCommand cmd = getReadCommandForTableSchema(table);
+            try (ReadExecutionController executionController = 
cmd.executionController();
+                 PartitionIterator schema = 
cmd.executeInternal(executionController))
+            {
+                while (schema.hasNext())
+                {
+                    try (RowIterator partition = schema.next())
+                    {
+                        if 
(!isSystemKeyspaceSchemaPartition(partition.partitionKey()))
+                            RowIterators.digest(partition, digest);
+                    }
+                }
+            }
+        }
+        return UUID.nameUUIDFromBytes(digest.digest());
+    }
+
     /**
      * @param schemaTableName The name of the table responsible for part of 
the schema
      * @return CFS responsible to hold low-level serialized schema
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 555cdb7f93..8adfb81051 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -362,7 +362,7 @@ public class ClusterMetadataService
                                                                
!ignored.contains(ep))
                                                  .collect(toImmutableSet());
 
-            Election.instance.nominateSelf(candidates, ignored, 
metadata::equals, metadata);
+            Election.instance.nominateSelf(candidates, ignored, metadata, 
true);
             ClusterMetadataService.instance().triggerSnapshot();
         }
         else
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index b184866f57..151d24ab90 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -59,6 +59,7 @@ import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.migration.Election;
+import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
@@ -229,8 +230,8 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
             {
                 Election.instance.nominateSelf(candidates.nodes(),
                                                
Collections.singleton(FBUtilities.getBroadcastAddressAndPort()),
-                                               (cm) -> true,
-                                               null);
+                                               ClusterMetadata.current(),
+                                               false);
             }
         }
 
@@ -243,7 +244,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
             }
             else
             {
-                Election.Initiator initiator = Election.instance.initiator();
+                CMSInitializationRequest.Initiator initiator = 
Election.instance.initiator();
                 candidates = Discovery.instance.discoverOnce(initiator == null 
? null : initiator.initiator);
             }
             Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
diff --git 
a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java 
b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
new file mode 100644
index 0000000000..dac50e5edb
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.migration;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.UUID;
+
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaKeyspace;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.TokenMap;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.UUIDSerializer;
+
+public class CMSInitializationRequest
+{
+    public static final IVersionedSerializer<CMSInitializationRequest> 
defaultMessageSerializer = new 
Serializer(NodeVersion.CURRENT.serializationVersion());
+
+    private static volatile Serializer serializerCache;
+
+    public static IVersionedSerializer<CMSInitializationRequest> 
messageSerializer(Version version)
+    {
+        Serializer cached = serializerCache;
+        if (cached != null && cached.serializationVersion.equals(version))
+            return cached;
+        cached = new Serializer(version);
+        serializerCache = cached;
+        return cached;
+    }
+
+    public final Initiator initiator;
+    public final Directory directory;
+    public final TokenMap tokenMap;
+    public final UUID schemaVersion;
+
+    public CMSInitializationRequest(InetAddressAndPort initiator, UUID 
initToken, ClusterMetadata metadata)
+    {
+        this(new Initiator(initiator, initToken), metadata.directory, 
metadata.tokenMap, SchemaKeyspace.calculateSchemaDigest());
+    }
+
+    public CMSInitializationRequest(Initiator initiator, Directory directory, 
TokenMap tokenMap, UUID schemaVersion)
+    {
+        this.initiator = initiator;
+        this.directory = directory;
+        this.tokenMap = tokenMap;
+        this.schemaVersion = schemaVersion;
+    }
+
+    public static class Serializer implements 
IVersionedSerializer<CMSInitializationRequest>
+    {
+        private final Version serializationVersion;
+
+        public Serializer(Version serializationVersion)
+        {
+            this.serializationVersion = serializationVersion;
+        }
+
+        @Override
+        public void serialize(CMSInitializationRequest t, DataOutputPlus out, 
int version) throws IOException
+        {
+            Initiator.serializer.serialize(t.initiator, out, version);
+            Directory.serializer.serialize(t.directory, out, 
serializationVersion);
+            TokenMap.serializer.serialize(t.tokenMap, out, 
serializationVersion);
+            UUIDSerializer.serializer.serialize(t.schemaVersion, out, version);
+        }
+
+        @Override
+        public CMSInitializationRequest deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            Initiator initiator = Initiator.serializer.deserialize(in, 
version);
+            Directory directory = Directory.serializer.deserialize(in, 
serializationVersion);
+            TokenMap tokenMap = TokenMap.serializer.deserialize(in, 
serializationVersion);
+            UUID schemaVersion = UUIDSerializer.serializer.deserialize(in, 
version);
+            return new CMSInitializationRequest(initiator, directory, 
tokenMap, schemaVersion);
+        }
+
+        @Override
+        public long serializedSize(CMSInitializationRequest t, int version)
+        {
+            return Initiator.serializer.serializedSize(t.initiator, version) +
+                   Directory.serializer.serializedSize(t.directory, 
serializationVersion) +
+                   TokenMap.serializer.serializedSize(t.tokenMap, 
serializationVersion) +
+                   UUIDSerializer.serializer.serializedSize(t.schemaVersion, 
version);
+        }
+    }
+
+    public static class Initiator
+    {
+        public static final Serializer serializer = new Serializer();
+        public final InetAddressAndPort initiator;
+        public final UUID initToken;
+
+        public Initiator(InetAddressAndPort initiator, UUID initToken)
+        {
+            this.initiator = initiator;
+            this.initToken = initToken;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (this == o) return true;
+            if (!(o instanceof Initiator)) return false;
+            Initiator other = (Initiator) o;
+            return Objects.equals(initiator, other.initiator) && 
Objects.equals(initToken, other.initToken);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(initiator, initToken);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Initiator{" +
+                   "initiator=" + initiator +
+                   ", initToken=" + initToken +
+                   '}';
+        }
+
+        public static class Serializer implements 
IVersionedSerializer<Initiator>
+        {
+            @Override
+            public void serialize(Initiator t, DataOutputPlus out, int 
version) throws IOException
+            {
+                
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator,
 out, version);
+                UUIDSerializer.serializer.serialize(t.initToken, out, version);
+            }
+
+            @Override
+            public Initiator deserialize(DataInputPlus in, int version) throws 
IOException
+            {
+                return new 
Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in,
 version),
+                                     UUIDSerializer.serializer.deserialize(in, 
version));
+            }
+
+            @Override
+            public long serializedSize(Initiator t, int version)
+            {
+                return 
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator,
 version) +
+                       UUIDSerializer.serializer.serializedSize(t.initToken, 
version);
+            }
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java 
b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java
new file mode 100644
index 0000000000..73502fdb63
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.migration;
+
+import java.io.IOException;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+public class CMSInitializationResponse
+{
+    public static final IVersionedSerializer<CMSInitializationResponse> 
serializer = new Serializer();
+
+    public final CMSInitializationRequest.Initiator initiator;
+    public final boolean metadataMatches;
+
+    public CMSInitializationResponse(CMSInitializationRequest.Initiator 
initiator, boolean metadataMatches)
+    {
+        this.initiator = initiator;
+        this.metadataMatches = metadataMatches;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CMSInitializationResponse{" +
+               "initiator=" + initiator +
+               ", metadataMatches=" + metadataMatches +
+               '}';
+    }
+
+    private static class Serializer implements 
IVersionedSerializer<CMSInitializationResponse>
+    {
+        @Override
+        public void serialize(CMSInitializationResponse t, DataOutputPlus out, 
int version) throws IOException
+        {
+            
CMSInitializationRequest.Initiator.serializer.serialize(t.initiator, out, 
version);
+            out.writeBoolean(t.metadataMatches);
+        }
+
+        @Override
+        public CMSInitializationResponse deserialize(DataInputPlus in, int 
version) throws IOException
+        {
+            CMSInitializationRequest.Initiator coordinator = 
CMSInitializationRequest.Initiator.serializer.deserialize(in, version);
+            boolean metadataMatches = in.readBoolean();
+            return new CMSInitializationResponse(coordinator, metadataMatches);
+        }
+
+        @Override
+        public long serializedSize(CMSInitializationResponse t, int version)
+        {
+            return 
CMSInitializationRequest.Initiator.serializer.serializedSize(t.initiator, 
version) +
+                   TypeSizes.sizeof(t.metadataMatches);
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java 
b/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
deleted file mode 100644
index fbb0bfa701..0000000000
--- a/src/java/org/apache/cassandra/tcm/migration/ClusterMetadataHolder.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.tcm.migration;
-
-import java.io.IOException;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer;
-import org.apache.cassandra.tcm.serialization.Version;
-
-public class ClusterMetadataHolder
-{
-    public static final IVersionedSerializer<ClusterMetadataHolder> 
defaultMessageSerializer = new 
ClusterMetadataHolder.Serializer(NodeVersion.CURRENT.serializationVersion());
-
-    private static volatile Serializer serializerCache;
-    public static IVersionedSerializer<ClusterMetadataHolder> 
messageSerializer(Version version)
-    {
-        Serializer cached = serializerCache;
-        if (cached != null && cached.serializationVersion.equals(version))
-            return cached;
-        cached = new Serializer(version);
-        serializerCache = cached;
-        return cached;
-    }
-
-    public final Election.Initiator coordinator;
-    public final ClusterMetadata metadata;
-
-    public ClusterMetadataHolder(Election.Initiator coordinator, 
ClusterMetadata metadata)
-    {
-        this.coordinator = coordinator;
-        this.metadata = metadata;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "ClusterMetadataHolder{" +
-               "coordinator=" + coordinator +
-               ", epoch=" + metadata.epoch +
-               '}';
-    }
-
-    private static class Serializer implements 
IVersionedSerializer<ClusterMetadataHolder>
-    {
-        private final Version serializationVersion;
-
-        public Serializer(Version serializationVersion)
-        {
-            this.serializationVersion = serializationVersion;
-        }
-
-        @Override
-        public void serialize(ClusterMetadataHolder t, DataOutputPlus out, int 
version) throws IOException
-        {
-            Election.Initiator.serializer.serialize(t.coordinator, out, 
version);
-            VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, 
t.metadata, out, serializationVersion);
-        }
-
-        @Override
-        public ClusterMetadataHolder deserialize(DataInputPlus in, int 
version) throws IOException
-        {
-            Election.Initiator coordinator = 
Election.Initiator.serializer.deserialize(in, version);
-            ClusterMetadata metadata = 
VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in);
-            return new ClusterMetadataHolder(coordinator, metadata);
-        }
-
-        @Override
-        public long serializedSize(ClusterMetadataHolder t, int version)
-        {
-            return Election.Initiator.serializer.serializedSize(t.coordinator, 
version) +
-                   
VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer, 
t.metadata, serializationVersion);
-        }
-    }
-}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 2bd2f7a392..04e1a8fa4c 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -20,14 +20,11 @@ package org.apache.cassandra.tcm.migration;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
@@ -36,12 +33,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.Startup;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.ownership.TokenMap;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.IVerbHandler;
@@ -52,7 +49,6 @@ import 
org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.UUIDSerializer;
 
 /**
  * Election process establishes initial CMS leader, from which you can further 
evolve cluster metadata.
@@ -60,9 +56,9 @@ import org.apache.cassandra.utils.UUIDSerializer;
 public class Election
 {
     private static final Logger logger = 
LoggerFactory.getLogger(Election.class);
-    private static final Initiator MIGRATED = new Initiator(null, null);
+    private static final CMSInitializationRequest.Initiator MIGRATED = new 
CMSInitializationRequest.Initiator(null, null);
 
-    private final AtomicReference<Initiator> initiator = new 
AtomicReference<>();
+    private final AtomicReference<CMSInitializationRequest.Initiator> 
initiator = new AtomicReference<>();
 
     public static Election instance = new Election();
 
@@ -83,7 +79,7 @@ public class Election
         this.abortHandler = new AbortHandler();
     }
 
-    public void nominateSelf(Set<InetAddressAndPort> candidates, 
Set<InetAddressAndPort> ignoredEndpoints, Function<ClusterMetadata, Boolean> 
isMatch, ClusterMetadata metadata)
+    public void nominateSelf(Set<InetAddressAndPort> candidates, 
Set<InetAddressAndPort> ignoredEndpoints, ClusterMetadata metadata, boolean 
verifyAllPeersMetadata)
     {
         Set<InetAddressAndPort> sendTo = new HashSet<>(candidates);
         sendTo.removeAll(ignoredEndpoints);
@@ -91,7 +87,7 @@ public class Election
 
         try
         {
-            initiate(sendTo, isMatch, metadata);
+            initiate(sendTo, metadata, verifyAllPeersMetadata);
             finish(sendTo);
         }
         catch (Exception e)
@@ -101,13 +97,14 @@ public class Election
         }
     }
 
-    private void initiate(Set<InetAddressAndPort> sendTo, 
Function<ClusterMetadata, Boolean> isMatch, ClusterMetadata metadata)
+    private void initiate(Set<InetAddressAndPort> sendTo, ClusterMetadata 
metadata, boolean verifyAllPeersMetadata)
     {
-        if (!updateInitiator(null, new 
Initiator(FBUtilities.getBroadcastAddressAndPort(), UUID.randomUUID())))
+        CMSInitializationRequest initializationRequest = new 
CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(), 
UUID.randomUUID(), metadata);
+        if (!updateInitiator(null, initializationRequest.initiator))
             throw new IllegalStateException("Migration already initiated by " 
+ initiator.get());
 
         logger.info("No previous migration detected, initiating");
-        Collection<Pair<InetAddressAndPort, ClusterMetadataHolder>> metadatas 
= MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_INIT_MIG_REQ, 
initiator.get());
+        Collection<Pair<InetAddressAndPort, CMSInitializationResponse>> 
metadatas = MessageDelivery.fanoutAndWait(messaging, sendTo, 
Verb.TCM_INIT_MIG_REQ, initializationRequest);
         if (metadatas.size() != sendTo.size())
         {
             Set<InetAddressAndPort> responded = metadatas.stream().map(p -> 
p.left).collect(Collectors.toSet());
@@ -116,45 +113,38 @@ public class Election
             throw new IllegalStateException(msg);
         }
 
-        Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p -> 
!isMatch.apply(p.right.metadata)).map(p -> p.left).collect(Collectors.toSet());
-        if (!mismatching.isEmpty())
+        if (verifyAllPeersMetadata)
         {
-            String msg = String.format("Got mismatching cluster metadatas from 
%s aborting migration", mismatching);
-            Map<InetAddressAndPort, ClusterMetadataHolder> metadataMap = new 
HashMap<>();
-            metadatas.forEach(pair -> metadataMap.put(pair.left, pair.right));
-            if (metadata != null)
+            Set<InetAddressAndPort> mismatching = metadatas.stream().filter(p 
-> !p.right.metadataMatches).map(p -> p.left).collect(Collectors.toSet());
+            if (!mismatching.isEmpty())
             {
-                for (InetAddressAndPort e : mismatching)
-                {
-                    logger.warn("Diff with {}", e);
-                    metadata.dumpDiff(metadataMap.get(e).metadata);
-                }
+                String msg = String.format("Got mismatching cluster metadatas. 
Check logs on peers (%s) for details of mismatches. Aborting migration.", 
mismatching);
+                throw new IllegalStateException(msg);
             }
-            throw new IllegalStateException(msg);
         }
     }
 
     private void finish(Set<InetAddressAndPort> sendTo)
     {
-        Initiator currentCoordinator = initiator.get();
-        assert 
currentCoordinator.initiator.equals(FBUtilities.getBroadcastAddressAndPort());
+        CMSInitializationRequest.Initiator currentInitiator = initiator.get();
+        assert 
currentInitiator.initiator.equals(FBUtilities.getBroadcastAddressAndPort());
 
         Startup.initializeAsFirstCMSNode();
         Register.maybeRegister();
         
SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID());
 
-        updateInitiator(currentCoordinator, MIGRATED);
+        updateInitiator(currentInitiator, MIGRATED);
         MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, 
DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false));
     }
 
     private void abort(Set<InetAddressAndPort> sendTo)
     {
-        Initiator init = initiator.getAndSet(null);
+        CMSInitializationRequest.Initiator init = initiator.getAndSet(null);
         for (InetAddressAndPort ep : sendTo)
             messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep);
     }
 
-    public Initiator initiator()
+    public CMSInitializationRequest.Initiator initiator()
     {
         return initiator.get();
     }
@@ -164,103 +154,63 @@ public class Election
         initiator.set(MIGRATED);
     }
 
-    private boolean updateInitiator(Initiator expected, Initiator 
newCoordinator)
+    private boolean updateInitiator(CMSInitializationRequest.Initiator 
expected, CMSInitializationRequest.Initiator newInitiator)
     {
-        Initiator current = initiator.get();
-        return Objects.equals(current, expected) && 
initiator.compareAndSet(current, newCoordinator);
+        CMSInitializationRequest.Initiator current = initiator.get();
+        return Objects.equals(current, expected) && 
initiator.compareAndSet(current, newInitiator);
     }
 
     public boolean isMigrating()
     {
-        Initiator coordinator = initiator();
-        return coordinator != null && coordinator != MIGRATED;
+        CMSInitializationRequest.Initiator initiator = initiator();
+        return initiator != null && initiator != MIGRATED;
     }
 
-    public class PrepareHandler implements IVerbHandler<Initiator>
+    public class PrepareHandler implements 
IVerbHandler<CMSInitializationRequest>
     {
         @Override
-        public void doVerb(Message<Initiator> message) throws IOException
+        public void doVerb(Message<CMSInitializationRequest> message) throws 
IOException
         {
             logger.info("Received election initiation message {} from {}", 
message.payload, message.from());
-            if (!updateInitiator(null, message.payload))
+            if (!updateInitiator(null, message.payload.initiator))
                 throw new IllegalStateException(String.format("Got duplicate 
initiate migration message from %s, migration is already started by %s", 
message.from(), initiator()));
 
-            // todo; disallow ANY changes to state managed in ClusterMetadata
             logger.info("Sending initiation response");
-            messaging.send(message.responseWith(new 
ClusterMetadataHolder(message.payload, ClusterMetadata.current())), 
message.from());
+            Directory initiatorDirectory = message.payload.directory;
+            TokenMap initiatorTokenMap = message.payload.tokenMap;
+            UUID initiatorSchemaVersion = message.payload.schemaVersion;
+            ClusterMetadata metadata = ClusterMetadata.current();
+            boolean match = true;
+            if (!initiatorDirectory.equals(metadata.directory))
+            {
+                match = false;
+                logger.warn("Initiator directory different from our");
+                initiatorDirectory.dumpDiff(metadata.directory);
+            }
+            if (!initiatorTokenMap.equals(metadata.tokenMap))
+            {
+                match = false;
+                logger.warn("Initiator tokenmap different from ours");
+                initiatorTokenMap.dumpDiff(metadata.tokenMap);
+            }
+            UUID schemaDigest = SchemaKeyspace.calculateSchemaDigest();
+            if (!initiatorSchemaVersion.equals(schemaDigest))
+            {
+                match = false;
+                logger.warn("Initiator schema different from our: {} != {}", 
initiatorSchemaVersion, schemaDigest);
+            }
+            messaging.send(message.responseWith(new 
CMSInitializationResponse(message.payload.initiator, match)), message.from());
         }
     }
 
-    public class AbortHandler implements IVerbHandler<Initiator>
+    public class AbortHandler implements 
IVerbHandler<CMSInitializationRequest.Initiator>
     {
         @Override
-        public void doVerb(Message<Initiator> message) throws IOException
+        public void doVerb(Message<CMSInitializationRequest.Initiator> 
message) throws IOException
         {
             logger.info("Received election abort message {} from {}", 
message.payload, message.from());
             if (!message.from().equals(initiator().initiator) || 
!updateInitiator(message.payload, null))
                 logger.error("Could not clear initiator - initiator is set to 
{}, abort message received from {}", initiator(), message.payload);
         }
     }
-
-    public static class Initiator
-    {
-        public static final Serializer serializer = new Serializer();
-
-        public final InetAddressAndPort initiator;
-        public final UUID initToken;
-
-        public Initiator(InetAddressAndPort initiator, UUID initToken)
-        {
-            this.initiator = initiator;
-            this.initToken = initToken;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o) return true;
-            if (!(o instanceof Initiator)) return false;
-            Initiator other = (Initiator) o;
-            return Objects.equals(initiator, other.initiator) && 
Objects.equals(initToken, other.initToken);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(initiator, initToken);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Initiator{" +
-                   "initiator=" + initiator +
-                   ", initToken=" + initToken +
-                   '}';
-        }
-
-        public static class Serializer implements 
IVersionedSerializer<Initiator>
-        {
-            @Override
-            public void serialize(Initiator t, DataOutputPlus out, int 
version) throws IOException
-            {
-                
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator,
 out, version);
-                UUIDSerializer.serializer.serialize(t.initToken, out, version);
-            }
-
-            @Override
-            public Initiator deserialize(DataInputPlus in, int version) throws 
IOException
-            {
-                return new 
Initiator(InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in,
 version),
-                                     UUIDSerializer.serializer.deserialize(in, 
version));
-            }
-
-            @Override
-            public long serializedSize(Initiator t, int version)
-            {
-                return 
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator,
 version) +
-                       UUIDSerializer.serializer.serializedSize(t.initToken, 
version);
-            }
-        }
-    }
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java 
b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
index cfced3426b..deef0a4b1b 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.migration.ClusterMetadataHolder;
+import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
 
 /**
  * Provides IVersionedSerializers for internode messages where the payload 
includes
@@ -69,13 +69,13 @@ public class MessageSerializers
         return 
Commit.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
     }
 
-    public static IVersionedSerializer<ClusterMetadataHolder> 
metadataHolderSerializer()
+    public static IVersionedSerializer<CMSInitializationRequest> 
initRequestSerializer()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
         if (metadata == null || 
metadata.directory.clusterMinVersion.serializationVersion == 
NodeVersion.CURRENT.serializationVersion)
-            return ClusterMetadataHolder.defaultMessageSerializer;
+            return CMSInitializationRequest.defaultMessageSerializer;
 
         assert 
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
-        return 
ClusterMetadataHolder.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+        return 
CMSInitializationRequest.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
deleted file mode 100644
index d89b902d7c..0000000000
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.upgrade;
-
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.Constants;
-import org.apache.cassandra.distributed.api.Feature;
-
-public class ClusterMetadataUpgradeIgnoreHostsTest extends UpgradeTestBase
-{
-    @Test
-    public void upgradeIgnoreHostsTest() throws Throwable
-    {
-        new TestCase()
-        .nodes(3)
-        .nodesToUpgrade(1, 2, 3)
-        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
-                                .set(Constants.KEY_DTEST_FULL_STARTUP, true))
-        .upgradesToCurrentFrom(v41)
-        .setup((cluster) -> {
-            cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
-            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
-        })
-        .runAfterClusterUpgrade((cluster) -> {
-            // todo; isolate node 3 - actually shutting it down makes us throw 
exceptions when test finishes
-            cluster.filters().allVerbs().to(3).drop();
-            cluster.filters().allVerbs().from(3).drop();
-            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure(); // node3 unreachable
-            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.1").asserts().failure(); // can't ignore localhost
-            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.3").asserts().success();
-        }).run();
-    }
-}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
index 87399d8a44..930933425c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java
@@ -27,6 +27,12 @@ import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 
 import static org.junit.Assert.assertFalse;
@@ -79,4 +85,34 @@ public class ClusterMetadataUpgradeTest extends 
UpgradeTestBase
             
assertTrue(Arrays.toString(desc[0]).contains("NetworkTopologyStrategy"));
         }).run();
     }
+
+
+    @Test
+    public void upgradeMismatchTest() throws Throwable
+    {
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2, 3)
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                                .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            IInvokableInstance n3 = ((IInvokableInstance) cluster.get(3));
+            n3.runOnInstance(() -> {
+                ClusterMetadata metadata = ClusterMetadata.current();
+                Directory diffingDirectory = metadata.directory.with(new 
NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.99")), new 
Location("aaa", "bbb"));
+                ClusterMetadata diffing = 
ClusterMetadata.current().transformer().with(diffingDirectory).buildForGossipMode();
+                ClusterMetadataService.instance().setFromGossip(diffing);
+            });
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure();
+            cluster.get(3).logs().watchFor("Initiator directory different from 
our");
+            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.3").asserts().success();
+            cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int 
primary key)"));
+        }).run();
+        }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to