This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f05b27502f Improve CMS initialization
f05b27502f is described below

commit f05b27502f665cb1ae32169a42e4c221ec01d02b
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Nov 20 15:52:34 2025 +0000

    Improve CMS initialization
    
    * Better handling of DOWN unupgraded nodes
    * Test that aborted CMS initialization cleans up state properly
    * Clean up orphaned PreInitialize entries in the log on bounce
    * Unconditionally reset initiator during abort
    * Ensure that metadata log entries aren't exchanged before CMS
      initialization is complete
    * Precalculate common serialization version, excluding non-upgraded and
      LEFT nodes
    * Decide if metadata-impacting upgrade is in progress using min common
      version
    * Add metadata identifier to nodetool cms output
    
    Patch by Sam Tunnicliffe and Marcus Eriksson; reviewed by Sam
    Tunnicliffe and Marcus Eriksson for CASSANDRA-21036
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 CHANGES.txt                                        |   1 +
 .../cassandra/db/AbstractMutationVerbHandler.java  |   2 +-
 .../apache/cassandra/gms/GossipVerbHandler.java    |   4 +-
 .../apache/cassandra/net/ResponseVerbHandler.java  |   4 +
 .../org/apache/cassandra/tcm/CMSOperations.java    |   2 +
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |   2 +-
 src/java/org/apache/cassandra/tcm/Startup.java     |   5 +-
 .../cassandra/tcm/log/SystemKeyspaceStorage.java   |  19 +-
 .../apache/cassandra/tcm/membership/Directory.java |  20 ++
 .../cassandra/tcm/membership/NodeVersion.java      |  17 +-
 .../apache/cassandra/tcm/migration/Election.java   |  23 +--
 .../tcm/serialization/MessageSerializers.java      |  26 +--
 .../cassandra/tcm/serialization/Version.java       |  24 ++-
 .../tcm/transformations/cms/Initialize.java        |   6 +
 .../apache/cassandra/tools/nodetool/CMSAdmin.java  |   2 +
 ...erMetadataUpgradeCleanupPreInitializeTest.java} |  44 ++++-
 ...lusterMetadataUpgradeDelayedInitializeTest.java | 219 +++++++++++++++++++++
 .../ClusterMetadataUpgradeIgnoreHostsTest.java     | 107 ++++++++++
 ...lusterMetadataUpgradeUnexpectedFailureTest.java | 110 +++++++++++
 .../cassandra/tcm/membership/DirectoryTest.java    |  77 ++++++++
 .../cassandra/tcm/membership/NodeVersionTest.java  |  49 +++++
 21 files changed, 716 insertions(+), 47 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 06b33e9846..b0b278a751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Improve CMS initialization (CASSANDRA-21036)
  * Introducing comments and security labels for schema elements 
(CASSANDRA-20943)
  * Extend nodetool tablestats for dictionary memory usage (CASSANDRA-20940)
  * Introduce separate GCInspector thresholds for concurrent GC events 
(CASSANDRA-20980)
diff --git a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
index 05512e5a8e..5d2e4e3cdc 100644
--- a/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/AbstractMutationVerbHandler.java
@@ -56,7 +56,7 @@ public abstract class AbstractMutationVerbHandler<T extends 
IMutation> implement
 
     protected void processMessage(Message<T> message, InetAddressAndPort 
respondTo)
     {
-        if (message.epoch().isAfter(Epoch.EMPTY))
+        if (message.epoch().isAfter(Epoch.FIRST))
         {
             ClusterMetadata metadata = ClusterMetadata.current();
             metadata = checkTokenOwnership(metadata, message, respondTo);
diff --git a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
index bac0854d07..bf51b83a01 100644
--- a/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipVerbHandler.java
@@ -21,12 +21,14 @@ package org.apache.cassandra.gms;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
 
 public class GossipVerbHandler<T> implements IVerbHandler<T>
 {
     public void doVerb(Message<T> message)
     {
         
Gossiper.instance.setLastProcessedMessageAt(message.creationTimeMillis());
-        
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), 
message.epoch());
+        if (message.epoch().isAfter(Epoch.FIRST))
+            
ClusterMetadataService.instance().fetchLogFromPeerAsync(message.from(), 
message.epoch());
     }
 }
diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java 
b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
index 517a10fd2a..2550ca2929 100644
--- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
+++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -92,6 +93,9 @@ class ResponseVerbHandler implements IVerbHandler
     private void maybeFetchLogs(Message<?> message)
     {
         ClusterMetadata metadata = ClusterMetadata.current();
+        if (!message.epoch().isAfter(Epoch.FIRST))
+            return;
+
         if (!message.epoch().isAfter(metadata.epoch))
             return;
 
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java 
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index a0917584d9..e439b1b9f0 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -63,6 +63,7 @@ public class CMSOperations implements CMSOperationsMBean
     public static final String LOCAL_PENDING = "LOCAL_PENDING";
     public static final String COMMITS_PAUSED = "COMMITS_PAUSED";
     public static final String REPLICATION_FACTOR = "REPLICATION_FACTOR";
+    public static final String CMS_ID = "CMS_ID";
 
     private static final Logger logger = 
LoggerFactory.getLogger(ClusterMetadataService.class);
     public static CMSOperations instance = new 
CMSOperations(ClusterMetadataService.instance());
@@ -161,6 +162,7 @@ public class CMSOperations implements CMSOperationsMBean
         info.put(LOCAL_PENDING, 
Integer.toString(cms.log().pendingBufferSize()));
         info.put(COMMITS_PAUSED, Boolean.toString(cms.commitsPaused()));
         info.put(REPLICATION_FACTOR, 
ReplicationParams.meta(metadata).toString());
+        info.put(CMS_ID, Integer.toString(metadata.metadataIdentifier));
         return info;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 6f4fac9f13..65cb869faf 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -1004,7 +1004,7 @@ public class ClusterMetadata
 
     public boolean metadataSerializationUpgradeInProgress()
     {
-        return 
!directory.clusterMaxVersion.serializationVersion().equals(directory.clusterMinVersion.serializationVersion());
+        return 
!directory.clusterMaxVersion.serializationVersion().equals(directory.commonSerializationVersion);
     }
 
     public static class Serializer implements 
MetadataSerializer<ClusterMetadata>
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index dab2d8387c..446c52a02e 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -58,8 +58,8 @@ import org.apache.cassandra.tcm.log.LogStorage;
 import org.apache.cassandra.tcm.log.SystemKeyspaceStorage;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
-import org.apache.cassandra.tcm.migration.Election;
 import org.apache.cassandra.tcm.migration.CMSInitializationRequest;
+import org.apache.cassandra.tcm.migration.Election;
 import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.InProgressSequences;
 import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
@@ -77,7 +77,7 @@ import static 
org.apache.cassandra.tcm.membership.NodeState.JOINED;
 import static org.apache.cassandra.tcm.membership.NodeState.LEFT;
 import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 
- /**
+/**
   * Initialize
   */
  public class Startup
@@ -141,7 +141,6 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         ClusterMetadataService.instance().log().bootstrap(addr, datacenter);
         ClusterMetadata metadata =  ClusterMetadata.current();
         assert ClusterMetadataService.state() == LOCAL : String.format("Can't 
initialize as node hasn't transitioned to CMS state. State: %s.\n%s", 
ClusterMetadataService.state(),  metadata);
-
         Initialize initialize = new 
Initialize(metadata.initializeClusterIdentifier(addr.hashCode()));
         ClusterMetadataService.instance().commit(initialize);
     }
diff --git a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java 
b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
index a0f6263e1c..92f4b78903 100644
--- a/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/SystemKeyspaceStorage.java
@@ -87,12 +87,25 @@ public class SystemKeyspaceStorage implements LogStorage
 
     public synchronized static boolean hasAnyEpoch()
     {
-        String query = String.format("SELECT epoch FROM %s.%s LIMIT 1", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
+        String query = String.format("SELECT epoch, kind FROM %s.%s LIMIT 2", 
SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME);
 
+        int count = 0;
+        long preInitializeEpoch = -1;
         for (UntypedResultSet.Row row : executeInternal(query))
-            return true;
+        {
+            count++;
+            if (Transformation.Kind.fromId(row.getInt("kind")) == 
Transformation.Kind.PRE_INITIALIZE_CMS)
+                preInitializeEpoch = row.getLong("epoch");
+        }
+        if (count == 1 && preInitializeEpoch != -1)
+        {
+            logger.warn("Cleaning up orphaned PreInitialize at epoch {} - 
restarting in gossip mode", preInitializeEpoch);
+            String cleanupQuery = String.format("DELETE from %s.%s where epoch 
= %d", SchemaConstants.SYSTEM_KEYSPACE_NAME, NAME, preInitializeEpoch);
+            executeInternal(cleanupQuery);
+            return false;
+        }
 
-        return false;
+        return count > 0;
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 0f6cf6eb14..b7d8684976 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -73,6 +73,7 @@ public class Directory implements MetadataValue<Directory>
     private final BTreeMap<String, Multimap<String, InetAddressAndPort>> 
racksByDC;
     public final NodeVersion clusterMinVersion;
     public final NodeVersion clusterMaxVersion;
+    public final Version commonSerializationVersion;
 
     public Directory()
     {
@@ -115,6 +116,7 @@ public class Directory implements MetadataValue<Directory>
         Pair<NodeVersion, NodeVersion> minMaxVer = minMaxVersions(states, 
versions);
         clusterMinVersion = minMaxVer.left;
         clusterMaxVersion = minMaxVer.right;
+        commonSerializationVersion = minCommonSerializationVersion(states, 
versions);
     }
 
     @Override
@@ -131,6 +133,9 @@ public class Directory implements MetadataValue<Directory>
                ", hostIds=" + hostIds +
                ", endpointsByDC=" + endpointsByDC +
                ", racksByDC=" + racksByDC +
+               ", clusterMinVersion=" + clusterMinVersion +
+               ", clusterMaxVersion=" + clusterMaxVersion +
+               ", commonSerializationVersion=" + commonSerializationVersion +
                '}';
     }
 
@@ -778,6 +783,21 @@ public class Directory implements MetadataValue<Directory>
         return Pair.create(minVersion, maxVersion);
     }
 
+    public static Version minCommonSerializationVersion(BTreeMap<NodeId, 
NodeState> states, BTreeMap<NodeId, NodeVersion> versions)
+    {
+        int commonVersion = Integer.MAX_VALUE;
+        for (Map.Entry<NodeId, NodeState> entry : states.entrySet())
+        {
+            if (entry.getValue() != NodeState.LEFT)
+            {
+                NodeVersion ver = versions.get(entry.getKey());
+                if (ver.serializationVersion > Version.OLD.asInt() && 
ver.serializationVersion < commonVersion)
+                    commonVersion = ver.serializationVersion;
+            }
+        }
+        return commonVersion == Integer.MAX_VALUE ? 
NodeVersion.CURRENT_METADATA_VERSION : Version.fromInt(commonVersion);
+    }
+
     @Override
     public int hashCode()
     {
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
index 9a15951ed0..33d60da6fe 100644
--- a/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeVersion.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.membership;
 import java.io.IOException;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.tcm.serialization.MetadataSerializer;
@@ -39,6 +40,8 @@ public class NodeVersion implements Comparable<NodeVersion>
     private static final CassandraVersion SINCE_VERSION = 
CassandraVersion.CASSANDRA_5_1;
 
     public final CassandraVersion cassandraVersion;
+
+    // this must be kept as an int, otherwise we can't deserialize future 
NodeVersions
     public final int serializationVersion;
 
     public NodeVersion(CassandraVersion cassandraVersion, Version 
serializationVersion)
@@ -113,10 +116,9 @@ public class NodeVersion implements Comparable<NodeVersion>
         @Override
         public void serialize(NodeVersion t, DataOutputPlus out, Version 
version) throws IOException
         {
-            out.writeUTF(t.cassandraVersion.toString());
             if (t.serializationVersion == Version.UNKNOWN.asInt())
                 throw new IllegalStateException("Should not serialize UNKNOWN 
version");
-            out.writeUnsignedVInt32(t.serializationVersion);
+            serializeHelper(out, t.cassandraVersion.toString(), 
t.serializationVersion);
         }
 
         @Override
@@ -133,5 +135,16 @@ public class NodeVersion implements Comparable<NodeVersion>
             return sizeof(t.cassandraVersion.toString()) +
                    sizeofUnsignedVInt(t.serializationVersion);
         }
+        /**
+         * Used to be able to generate a NodeVersion with a future 
serialization version, for test!
+         */
+        @VisibleForTesting
+        static void serializeHelper(DataOutputPlus out, String 
cassandraVersion, int serializationVersion) throws IOException
+        {
+            out.writeUTF(cassandraVersion);
+            out.writeUnsignedVInt32(serializationVersion);
+        }
     }
+
+
 }
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 6ada116323..ee8bb36af5 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -89,25 +89,24 @@ public class Election
         Set<InetAddressAndPort> sendTo = new HashSet<>(candidates);
         sendTo.removeAll(ignoredEndpoints);
         sendTo.remove(FBUtilities.getBroadcastAddressAndPort());
-
+        CMSInitializationRequest initializationRequest = new 
CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(), 
UUID.randomUUID(), metadata);
+        if (!updateInitiator(null, initializationRequest.initiator))
+            throw new IllegalStateException("Migration already initiated by " 
+ initiator.get());
         try
         {
-            initiate(sendTo, metadata, verifyAllPeersMetadata);
+            initiate(initializationRequest, sendTo, metadata, 
verifyAllPeersMetadata);
             finish(sendTo);
         }
-        catch (Exception e)
+        catch (Throwable e)
         {
-            abort(sendTo);
+            logger.error("Got error nominating self", e);
+            abort(initializationRequest.initiator, sendTo);
             throw e;
         }
     }
 
-    private void initiate(Set<InetAddressAndPort> sendTo, ClusterMetadata 
metadata, boolean verifyAllPeersMetadata)
+    private void initiate(CMSInitializationRequest initializationRequest, 
Set<InetAddressAndPort> sendTo, ClusterMetadata metadata, boolean 
verifyAllPeersMetadata)
     {
-        CMSInitializationRequest initializationRequest = new 
CMSInitializationRequest(FBUtilities.getBroadcastAddressAndPort(), 
UUID.randomUUID(), metadata);
-        if (!updateInitiator(null, initializationRequest.initiator))
-            throw new IllegalStateException("Migration already initiated by " 
+ initiator.get());
-
         logger.info("No previous migration detected, initiating");
         Collection<Pair<InetAddressAndPort, CMSInitializationResponse>> 
metadatas = MessageDelivery.fanoutAndWait(messaging, sendTo, 
Verb.TCM_INIT_MIG_REQ, initializationRequest);
         if (metadatas.size() != sendTo.size())
@@ -149,9 +148,11 @@ public class Election
         }
     }
 
-    private void abort(Set<InetAddressAndPort> sendTo)
+    private void abort(CMSInitializationRequest.Initiator init, 
Set<InetAddressAndPort> sendTo)
     {
-        CMSInitializationRequest.Initiator init = initiator.getAndSet(null);
+        logger.info("Aborting migration");
+        CMSInitializationRequest.Initiator previous = 
initiator.getAndSet(null);
+        logger.info("Reset local initiator state (was {}), sending abort 
message to peers", previous);
         for (InetAddressAndPort ep : sendTo)
             messaging.send(Message.out(Verb.TCM_ABORT_MIG, init), ep);
     }
diff --git 
a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java 
b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
index deef0a4b1b..63fcfc8180 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/MessageSerializers.java
@@ -35,47 +35,47 @@ import 
org.apache.cassandra.tcm.migration.CMSInitializationRequest;
  * NOTE: Serialization version here is used for convenience of serializing the 
message
  * on the outgoing path. Since receiving node may have a different view of
  * min serialization version, we _always_ have to either use a {@link 
VerboseMetadataSerializer}
- * (like {@link LogState}/ {@link Replication} or explicitly serialize the 
version (like {@link Commit}).
+ * (like {@link LogState}) or explicitly serialize the version (like {@link 
Commit}).
  */
 public class MessageSerializers
 {
     public static IVersionedSerializer<LogState> logStateSerializer()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null || 
metadata.directory.clusterMinVersion.serializationVersion == 
NodeVersion.CURRENT.serializationVersion)
+        if (metadata == null || 
metadata.directory.commonSerializationVersion.asInt() == 
NodeVersion.CURRENT.serializationVersion)
             return LogState.defaultMessageSerializer;
 
-        assert 
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
-        return 
LogState.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+        assert 
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+        return 
LogState.messageSerializer(metadata.directory.commonSerializationVersion);
     }
 
     public static IVersionedSerializer<Commit.Result> commitResultSerializer()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null || 
metadata.directory.clusterMinVersion.serializationVersion == 
NodeVersion.CURRENT.serializationVersion)
+        if (metadata == null || 
metadata.directory.commonSerializationVersion.asInt() == 
NodeVersion.CURRENT.serializationVersion)
             return Commit.Result.defaultMessageSerializer;
 
-        assert 
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
-        return 
Commit.Result.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+        assert 
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+        return 
Commit.Result.messageSerializer(metadata.directory.commonSerializationVersion);
     }
 
     public static IVersionedSerializer<Commit> commitSerializer()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null || 
metadata.directory.clusterMinVersion.serializationVersion == 
NodeVersion.CURRENT.serializationVersion)
+        if (metadata == null || 
metadata.directory.commonSerializationVersion.asInt() == 
NodeVersion.CURRENT.serializationVersion)
             return Commit.defaultMessageSerializer;
 
-        assert 
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
-        return 
Commit.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+        assert 
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+        return 
Commit.messageSerializer(metadata.directory.commonSerializationVersion);
     }
 
     public static IVersionedSerializer<CMSInitializationRequest> 
initRequestSerializer()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata == null || 
metadata.directory.clusterMinVersion.serializationVersion == 
NodeVersion.CURRENT.serializationVersion)
+        if (metadata == null || 
metadata.directory.commonSerializationVersion.asInt() == 
NodeVersion.CURRENT.serializationVersion)
             return CMSInitializationRequest.defaultMessageSerializer;
 
-        assert 
!metadata.directory.clusterMinVersion.serializationVersion().equals(NodeVersion.CURRENT.serializationVersion());
-        return 
CMSInitializationRequest.messageSerializer(metadata.directory.clusterMinVersion.serializationVersion());
+        assert 
!metadata.directory.commonSerializationVersion.equals(NodeVersion.CURRENT.serializationVersion());
+        return 
CMSInitializationRequest.messageSerializer(metadata.directory.commonSerializationVersion);
     }
 }
diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java 
b/src/java/org/apache/cassandra/tcm/serialization/Version.java
index c4fb39cd51..9cbcb8587a 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/Version.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java
@@ -29,7 +29,13 @@ import org.apache.cassandra.tcm.membership.NodeVersion;
 
 public enum Version
 {
+    /**
+     * Used for nodes on older, pre-CEP-21 release versions
+     */
     OLD(-1),
+    /**
+     * Initial version
+     */
     V0(0),
     /**
      *  - Moved Partitioner in ClusterMetadata serializer to be the first field
@@ -81,6 +87,7 @@ public enum Version
     public static final Version MIN_ACCORD_VERSION = V7;
 
     private static Map<Integer, Version> values = new HashMap<>();
+
     static
     {
         for (Version v : values())
@@ -99,10 +106,9 @@ public enum Version
     public static Version minCommonSerializationVersion()
     {
         ClusterMetadata metadata = ClusterMetadata.currentNullable();
-        if (metadata != null)
-            return metadata.directory.clusterMinVersion.serializationVersion();
-        return NodeVersion.CURRENT.serializationVersion();
-
+        return metadata != null
+               ? metadata.directory.commonSerializationVersion
+               : NodeVersion.CURRENT.serializationVersion();
     }
 
     public int asInt()
@@ -120,11 +126,21 @@ public enum Version
         return version >= other.version;
     }
 
+    public boolean isEqualOrBefore(Version version)
+    {
+        return this.version <= version.version;
+    }
+
     public boolean isBefore(Version other)
     {
         return version < other.version;
     }
 
+    public boolean isAfter(Version other)
+    {
+        return version > other.version;
+    }
+
     public static Version fromInt(int i)
     {
         Version v = values.get(i);
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java 
b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
index 11c2ed4d31..9564372c6f 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/cms/Initialize.java
@@ -61,6 +61,12 @@ public class Initialize extends ForceSnapshot
         super(baseState);
     }
 
+    @Override
+    public boolean allowDuringUpgrades()
+    {
+        return true;
+    }
+
     public Kind kind()
     {
         return Kind.INITIALIZE_CMS;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
index 8c5dc0b0dc..40fb38464c 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java
@@ -34,6 +34,7 @@ import picocli.CommandLine.Command;
 import picocli.CommandLine.Option;
 import picocli.CommandLine.Parameters;
 
+import static org.apache.cassandra.tcm.CMSOperations.CMS_ID;
 import static org.apache.cassandra.tcm.CMSOperations.COMMITS_PAUSED;
 import static org.apache.cassandra.tcm.CMSOperations.EPOCH;
 import static org.apache.cassandra.tcm.CMSOperations.IS_MEMBER;
@@ -73,6 +74,7 @@ public class CMSAdmin extends AbstractCommand
         {
             Map<String, String> info = 
probe.getCMSOperationsProxy().describeCMS();
             output.out.printf("Cluster Metadata Service:%n");
+            output.out.printf("CMS Identifier: %s%n", info.get(CMS_ID));
             output.out.printf("Members: %s%n", info.get(MEMBERS));
             output.out.printf("Needs reconfiguration: %s%n", 
info.get(NEEDS_RECONFIGURATION));
             output.out.printf("Is Member: %s%n", info.get(IS_MEMBER));
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
similarity index 52%
rename from 
test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
index 51267db100..3117abaed9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeCleanupPreInitializeTest.java
@@ -22,11 +22,16 @@ import org.junit.Test;
 
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.tcm.Transformation;
 
-public class ClusterMetadataUpgradeIgnoreHostTest extends UpgradeTestBase
+import static org.junit.Assert.assertTrue;
+
+
+public class ClusterMetadataUpgradeCleanupPreInitializeTest extends 
UpgradeTestBase
 {
+
     @Test
-    public void upgradeIgnoreHostsTest() throws Throwable
+    public void cleanupPreInitializeTest() throws Throwable
     {
         new TestCase()
         .nodes(3)
@@ -39,12 +44,35 @@ public class ClusterMetadataUpgradeIgnoreHostTest extends 
UpgradeTestBase
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
         })
         .runAfterClusterUpgrade((cluster) -> {
-            // todo; isolate node 3 - actually shutting it down makes us throw 
exceptions when test finishes
-            cluster.filters().allVerbs().to(3).drop();
-            cluster.filters().allVerbs().from(3).drop();
-            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure(); // node3 unreachable
-            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.1").asserts().failure(); // can't ignore localhost
-            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.3").asserts().success();
+            cluster.get(1).executeInternal("INSERT INTO 
system.local_metadata_log (epoch, kind) VALUES (1, 0)");
+            cluster.get(1).flush("system");
+            cluster.get(1).shutdown().get();
+            cluster.get(1).startup();
+            cluster.get(1).logs().watchFor("Cleaning up orphaned PreInitialize 
at epoch 1");
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+            cluster.get(1).shutdown().get();
+            cluster.get(1).startup();
+            boolean seenPreInit = false;
+            boolean seenInit = false;
+            boolean seenSnapshot = false;
+            for (Object [] row : cluster.get(1).executeInternal("SELECT epoch, 
kind FROM system.local_metadata_log"))
+            {
+                switch (Transformation.Kind.fromId((Integer)row[1]))
+                {
+                    case PRE_INITIALIZE_CMS:
+                        seenPreInit = true;
+                        break;
+                    case INITIALIZE_CMS:
+                        seenInit = true;
+                        break;
+                    case TRIGGER_SNAPSHOT:
+                        seenSnapshot = true;
+                        break;
+                }
+            }
+            assertTrue(seenPreInit);
+            assertTrue(seenInit);
+            assertTrue(seenSnapshot);
         }).run();
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
new file mode 100644
index 0000000000..dc16786f0b
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeDelayedInitializeTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Shared;
+import org.awaitility.Awaitility;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertEquals;
+
+public class ClusterMetadataUpgradeDelayedInitializeTest extends 
UpgradeTestBase
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusterMetadataUpgradeDelayedInitializeTest.class);
+
+    @Test
+    public void delayedInitializationTest() throws Throwable
+    {
+        // In this test we force the initiator to pause between committing the 
PreInitialize and Initialize, then verify
+        // that read/write/gossip messages exchanged whilst in this state do 
not cause the other nodes to fetch the
+        // PreInitialize log entry. We inject that pause using ByteBuddy to 
add a wait on a CountdownLatch controlled
+        // from the main test code.
+        final int nodeCount = 3;
+        Versions dtestVersions = Versions.find();
+        Versions.Version earliestVersion = dtestVersions.getLatest(v41);
+        Consumer<IInstanceConfig> configUpdater = config -> 
config.with(Feature.NETWORK, Feature.GOSSIP);
+        Consumer<UpgradeableCluster.Builder> builderUpdater = builder -> 
builder.withInstanceInitializer(ClusterMetadataUpgradeDelayedInitializeTest.BBInstaller::installUpgradeVersionBB);
+        try (UpgradeableCluster cluster = UpgradeableCluster.create(nodeCount, 
earliestVersion, configUpdater, builderUpdater))
+        {
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int 
PRIMARY KEY)");
+
+            // Upgrade all instances
+            Versions.Version upgradeVersion = dtestVersions.getLatest(CURRENT);
+            for (IUpgradeableInstance i : cluster)
+                upgradeInstance(i, upgradeVersion);
+            logger.info(String.format("Upgrade complete. Current version: %s", 
upgradeVersion));
+            ((IInvokableInstance)cluster.get(2)).runOnInstance(() -> {
+                KeyspaceMetadata ksm = 
ClusterMetadata.current().schema.getKeyspaceMetadata("distributed_test_keyspace");
+                TableMetadata tm = ksm.tables.getNullable("tbl");
+                System.out.println(tm.epoch);
+            });
+            // Start the CMS initialization, which should pause after 
committing the PreInitialize transform
+            assertEquals(1, BBState.latch.getCount());
+            ExecutorService initExecutor = Executors.newSingleThreadExecutor();
+            IUpgradeableInstance initiator = cluster.get(1);
+            Future<NodeToolResult> result = initExecutor.submit(() -> 
initiator.nodetoolResult("cms", "initialize"));
+            logger.info("Waiting for initiator to pause");
+            Awaitility.waitAtMost(30, TimeUnit.SECONDS).until(() -> {
+                // wait until the intiator has committed the PreInitialize 
entry
+                NodeToolResult res = initiator.nodetoolResult("cms");
+                return res.getStdout().contains("Epoch: 1");
+            });
+            logger.info("Initiator paused");
+
+            // internode messages for reads/writes should not trigger catchup 
between node1 and its peers
+            for (int i = 1; i <= 3; i++)
+            {
+                cluster.coordinator(i).execute(withKeyspace("INSERT INTO 
%s.tbl (k) VALUES (0)"), ConsistencyLevel.ALL);
+                cluster.coordinator(i).execute(withKeyspace("SELECT * FROM 
%s.tbl WHERE k=0"), ConsistencyLevel.ALL);
+            }
+            // nor should gossip messages
+            awaitGossipUpdateFromPeer(cluster.get(2), initiator);
+            awaitGossipUpdateFromPeer(cluster.get(3), initiator);
+
+            // Assert that nodes 2 & 3 did not catch up from node 1
+            
cluster.get(2).nodetoolResult("cms").asserts().success().stdoutContains("Epoch: 
-9223372036854775807");
+            
cluster.get(3).nodetoolResult("cms").asserts().success().stdoutContains("Epoch: 
-9223372036854775807");
+
+            logger.info("Allowing initiator to continue");
+            BBState.latch.countDown();
+            NodeToolResult initRes = result.get(30, TimeUnit.SECONDS);
+            initRes.asserts().success();
+
+            // Verify that the initialization completes
+            Awaitility.waitAtMost(30, TimeUnit.SECONDS)
+                      .until(() -> 
cluster.get(1).nodetoolResult("cms").getStdout().contains("Epoch: 3") &&
+                                   
cluster.get(2).nodetoolResult("cms").getStdout().contains("Epoch: 3") &&
+                                   
cluster.get(3).nodetoolResult("cms").getStdout().contains("Epoch: 3"));
+        }
+    }
+
+    /**
+     * Inspects gossip state on one peer and waits for the heartbeat state of 
another peer to increment
+     * (which is not an absolute guarantee of direct communication between the 
two nodes, but should be good enough).
+     *
+     * @param waiter the node doing the checking & waiting to observe an update
+     * @param waitingFor the node who's heartbeat being observed
+     */
+    private void awaitGossipUpdateFromPeer(IUpgradeableInstance waiter, 
IUpgradeableInstance waitingFor)
+    {
+        String targetEndpoint = 
waitingFor.config().getString("broadcast_address");
+        ((IInvokableInstance) waiter).runOnInstance(() -> {
+            InetAddressAndPort endpoint = 
InetAddressAndPort.getByNameUnchecked(targetEndpoint);
+            final int before = 
Gossiper.instance.getEndpointStateForEndpoint(endpoint)
+                                                .getHeartBeatState()
+                                                .getHeartBeatVersion();
+
+            long deadline = System.nanoTime() + 
TimeUnit.NANOSECONDS.convert(30, TimeUnit.SECONDS);
+            while (Gossiper.instance.getEndpointStateForEndpoint(endpoint)
+                                    .getHeartBeatState()
+                                    .getHeartBeatVersion() < before + 1)
+            {
+                FBUtilities.sleepQuietly(100);
+                if (System.nanoTime() > deadline)
+                    throw new RuntimeException("Timed out waiting for gossip 
state to update");
+            }
+        });
+    }
+
+    private void upgradeInstance(IUpgradeableInstance instance, 
Versions.Version upgradeTo) throws ExecutionException, InterruptedException, 
TimeoutException
+    {
+        int instanceId = instance.config().num();
+        logger.info("Shutting down instance {} to upgrade to {}", instanceId, 
upgradeTo.version);
+        instance.shutdown(true).get(60, TimeUnit.SECONDS);
+        logger.info("Starting instanceId {} on version {}", instanceId, 
upgradeTo.version);
+        instance.setVersion(upgradeTo);
+        instance.startup();
+        logger.info("Started instanceId {}", instanceId);
+    }
+
+
+    @Shared
+    public static class BBState
+    {
+        public static CountDownLatch latch = new CountDownLatch(1);
+    }
+
+    public static class BBInstaller
+    {
+        public static void installUpgradeVersionBB(ClassLoader classLoader, 
Integer num)
+        {
+            try
+            {
+                new ByteBuddy().rebase(ClusterMetadata.class)
+                               .method(named("initializeClusterIdentifier"))
+                               
.intercept(MethodDelegation.to(ClusterMetadataUpgradeDelayedInitializeTest.BBInterceptor.class))
+                               .make()
+                               .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+            catch (NoClassDefFoundError noClassDefFoundError)
+            {
+                logger.info("... but no class def", noClassDefFoundError);
+            }
+            catch (Throwable tr)
+            {
+                logger.info("Unable to intercept upgradeFromVersion method", 
tr);
+                throw tr;
+            }
+        }
+    }
+
+    public static class BBInterceptor
+    {
+        @SuppressWarnings("unused")
+        public static ClusterMetadata initializeClusterIdentifier(@SuperCall 
Callable<ClusterMetadata> zuper)
+        {
+            try
+            {
+                logger.info("initializeClusterIdentifier waiting...");
+                BBState.latch.await(60, TimeUnit.SECONDS);
+                logger.info("initializeClusterIdentifier continuing...");
+                return zuper.call();
+            }
+            catch (Throwable e)
+            {
+                logger.info("error:", e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
new file mode 100644
index 0000000000..e5cbcb7555
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeIgnoreHostsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class ClusterMetadataUpgradeIgnoreHostsTest extends UpgradeTestBase
+{
+    @Test
+    public void upgradeIgnoreHostsTest() throws Throwable
+    {
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2, 3)
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                                .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+        .upgradesToCurrentFrom(v41)
+        .setup((cluster) -> {
+            cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            // todo; isolate node 3 - actually shutting it down makes us throw 
exceptions when test finishes
+            cluster.filters().allVerbs().to(3).drop();
+            cluster.filters().allVerbs().from(3).drop();
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure(); // node3 unreachable
+            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.1").asserts().failure(); // can't ignore localhost
+            cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", 
"127.0.0.3").asserts().success();
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"2").asserts().success();
+        }).run();
+    }
+
+    @Test
+    public void upgradeIgnoreHostsNonUpgradedTest() throws Throwable
+    {
+        new TestCase()
+                .nodes(3)
+                .nodesToUpgrade(1, 2)
+                .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                        .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+                .upgradesToCurrentFrom(v41)
+                .setup((cluster) -> {
+                    cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+                    cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+                })
+                .runAfterClusterUpgrade((cluster) -> {
+                    cluster.filters().allVerbs().to(3).drop();
+                    cluster.filters().allVerbs().from(3).drop();
+                    cluster.get(1).nodetoolResult("cms", "initialize", 
"--ignore", "127.0.0.3").asserts().success();
+                    cluster.get(3).shutdown().get();
+                    cluster.filters().reset();
+                    
cluster.get(3).setVersion(Versions.find().getLatest(CURRENT));
+                    cluster.get(3).startup();
+                    cluster.schemaChange(withKeyspace("ALTER TABLE " + 
KEYSPACE + ".tbl with comment = 'test'"));
+                    ((IInvokableInstance)cluster.get(3)).runOnInstance(() -> {
+                        Epoch current = ClusterMetadata.current().epoch;
+                        if (current.isBefore(Epoch.FIRST))
+                            throw new AssertionError("Epoch was not 
incremented as expected, still at " + current);
+                    });
+
+                    // Verify that CMS identifier has propagated across the 
nodes as this asserts that the DOWN node
+                    // did not affect the common serialization version for 
metadata.
+                    Set<Long> identifiers = new HashSet<>();
+                    Pattern p = Pattern.compile(".*CMS Identifier\\: 
([\\d]*).*", Pattern.DOTALL);
+                    cluster.forEach(i -> {
+                        Matcher m = 
p.matcher(i.nodetoolResult("cms").getStdout());
+                        assertTrue(m.matches());
+                        identifiers.add(Long.parseLong(m.group(1)));
+                    });
+                    assertEquals(1, identifiers.size());
+                    assertNotEquals(ClusterMetadata.EMPTY_METADATA_IDENTIFIER, 
(long) identifiers.iterator().next());
+                    cluster.get(1).nodetoolResult("cms", "reconfigure", 
"2").asserts().success();
+                }).run();
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
new file mode 100644
index 0000000000..8ab8dda4cc
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeUnexpectedFailureTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.migration.Election;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+public class ClusterMetadataUpgradeUnexpectedFailureTest extends 
UpgradeTestBase
+{
+    @Test
+    public void upgradeFailsUnexpectedlyAfterInitiation() throws Throwable
+    {
+        Consumer<UpgradeableCluster.Builder > builderUpdater = builder -> 
builder.withInstanceInitializer(BBInstaller::installUpgradeVersionBB);
+        new TestCase()
+                .nodes(3)
+                .nodesToUpgrade(1, 2, 3)
+                .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)
+                        .set(Constants.KEY_DTEST_FULL_STARTUP, true))
+                .upgradesToCurrentFrom(v41)
+                .withBuilder(builderUpdater)
+                .setup((cluster) -> {
+                    cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor':2}"));
+                    cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
+                })
+                .runAfterClusterUpgrade((cluster) -> {
+                    // we injected a BB helper to trigger an unexpected 
failure on the first attempt at initialization.
+                    // i.e. an exception that must be caught as opposed to a 
mismatch in metadata or down peer.
+                    cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().failure().errorContains("Something unexpected went 
wrong");
+                    // handling the failure should have included cleaning up 
any state so that another attempt can be
+                    // made, which this time should succeed.
+                    cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+                }).run();
+    }
+
+    public static class BBInstaller
+    {
+        public static void installUpgradeVersionBB(ClassLoader classLoader, 
Integer num)
+        {
+            try
+            {
+                new ByteBuddy().rebase(Election.class)
+                               .method(named("finish"))
+                               
.intercept(MethodDelegation.to(BBInterceptor.class))
+                               .make()
+                               .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+            catch (NoClassDefFoundError noClassDefFoundError)
+            {
+                throw noClassDefFoundError;
+            }
+            catch (Throwable tr)
+            {
+                throw tr;
+            }
+        }
+    }
+
+    public static class BBInterceptor
+    {
+        private static final AtomicBoolean firstAttempt = new 
AtomicBoolean(true);
+
+        @SuppressWarnings("unused")
+        public static void finish(Set<InetAddressAndPort> sendTo, @SuperCall 
Callable<Void> zuper)
+        {
+            if (firstAttempt.getAndSet(false))
+                throw new IllegalStateException("Something unexpected went 
wrong");
+
+            try
+            {
+                zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java 
b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
index ea66961ee5..790ac3380c 100644
--- a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
+++ b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
@@ -20,6 +20,9 @@ package org.apache.cassandra.tcm.membership;
 
 import org.junit.Test;
 
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CassandraVersion;
+
 import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -111,6 +114,80 @@ public class DirectoryTest
         assertTrue(dir.allDatacenterRacks().isEmpty());
     }
 
+    @Test
+    public void commonSerializationVersionTest()
+    {
+        int cnt = 1;
+        Location DC1_R1 = new Location("datacenter1", "rack1");
+        Directory dir = new Directory();
+        assertTrue(dir.isEmpty());
+        assertEquals(NodeVersion.CURRENT_METADATA_VERSION, 
dir.commonSerializationVersion);
+
+        dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+        assertEquals(NodeVersion.CURRENT.serializationVersion, 
dir.commonSerializationVersion.asInt());
+
+        dir = dir.with(addresses(cnt++), DC1_R1, 
withSerializationVersion(Version.V4));
+        assertEquals(Version.V4, dir.commonSerializationVersion);
+
+        dir = dir.with(addresses(cnt++), DC1_R1, 
withSerializationVersion(Version.V3));
+        assertEquals(Version.V3, dir.commonSerializationVersion);
+
+        dir = dir.with(addresses(cnt++), DC1_R1, 
withSerializationVersion(Version.V2));
+        assertEquals(Version.V2, dir.commonSerializationVersion);
+
+        dir = dir.with(addresses(cnt++), DC1_R1, 
withSerializationVersion(Version.V1));
+        assertEquals(Version.V1, dir.commonSerializationVersion);
+
+        // int v
+        dir = dir.with(addresses(cnt++), DC1_R1, 
withSerializationVersion(Version.V0));
+        assertEquals(Version.V0, dir.commonSerializationVersion);
+
+        // Adding another, higher version doesn't affect the floor
+        dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+        assertEquals(Version.V0, dir.commonSerializationVersion);
+
+        // Version.OLD for pre-CEP-21 nodes doesn't affect the floor
+        dir = dir.with(addresses(cnt), DC1_R1, 
withSerializationVersion(Version.OLD));
+        assertEquals(Version.V0, dir.commonSerializationVersion);
+
+        // remove all peers except the first, common version should revert to 
CURRENT
+        while (cnt > 1)
+            dir = dir.without(dir.lastModified().nextEpoch(), new 
NodeId(cnt--));
+
+        assertEquals(NodeVersion.CURRENT.serializationVersion, 
dir.commonSerializationVersion.asInt());
+    }
+
+    @Test
+    public void commonSerializationVersionLEFTNodesTest()
+    {
+        int cnt = 0;
+        Location DC1_R1 = new Location("datacenter1", "rack1");
+        Directory dir = new Directory();
+        assertTrue(dir.isEmpty());
+
+        dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+        dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+        dir = dir.with(addresses(cnt++), DC1_R1, NodeVersion.CURRENT);
+        NodeAddresses leavingEndpoint = addresses(cnt++);
+        dir = dir.with(leavingEndpoint, DC1_R1, 
withSerializationVersion(Version.V4));
+        assertEquals(Version.V4, dir.commonSerializationVersion);
+
+        NodeId leavingNode = dir.peerId(leavingEndpoint.broadcastAddress);
+        dir = dir.withNodeState(leavingNode, NodeState.LEFT);
+        assertEquals(NodeVersion.CURRENT_METADATA_VERSION, 
dir.commonSerializationVersion);
+    }
+
+
+    private static NodeAddresses addresses(int i)
+    {
+        return new NodeAddresses(endpoint(i));
+    }
+
+    private static NodeVersion withSerializationVersion(Version version)
+    {
+        return new NodeVersion(CassandraVersion.NULL_VERSION, version);
+    }
+
     private void assertInvalidLocationUpdate(Directory dir, NodeId nodeId, 
Location loc, String message)
     {
         try
diff --git a/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java 
b/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java
new file mode 100644
index 0000000000..3530570a1b
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/membership/NodeVersionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class NodeVersionTest
+{
+    @Test
+    public void futureNodeVersionTest() throws IOException
+    {
+        ByteBuffer bb;
+        try (DataOutputBuffer dob = new DataOutputBuffer())
+        {
+            NodeVersion.Serializer.serializeHelper(dob, "8.0.0", 
NodeVersion.CURRENT.serializationVersion + 1);
+            bb = dob.asNewBuffer();
+        }
+
+        try (DataInputBuffer in = new DataInputBuffer(bb, false))
+        {
+            NodeVersion n = NodeVersion.serializer.deserialize(in, 
NodeVersion.CURRENT.serializationVersion());
+            assertEquals(n.serializationVersion().asInt(), 
Version.UNKNOWN.asInt());
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to