This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7695a6220 Forbid nodes upgrading to a version which cannot read 
existing log entries
a7695a6220 is described below

commit a7695a62200c3085fea634b9fdc8d393a057458e
Author: Aparna Naik <[email protected]>
AuthorDate: Fri Feb 13 12:03:36 2026 -0800

    Forbid nodes upgrading to a version which cannot read existing log entries
    
    Patch by Aparna Naik; reviewed by Marcus Eriksson and Sam Tunnicliffe
    for CASSANDRA-21174
---
 CHANGES.txt                                        |   1 +
 .../cassandra/tcm/transformations/Register.java    |  18 +++
 .../cassandra/tcm/transformations/Startup.java     |  15 +++
 ...compatibleMetadataSerializationVersionTest.java | 111 +++++++++++++++
 .../distributed/test/log/RegisterTest.java         |  46 ++++---
 test/unit/org/apache/cassandra/Util.java           |  12 --
 .../InsertUpdateIfConditionCollectionsTest.java    |  17 +--
 .../InsertUpdateIfConditionStaticsTest.java        |  14 +-
 .../operations/InsertUpdateIfConditionTest.java    |  60 ++++++---
 .../apache/cassandra/tcm/ClusterMetadataTest.java  |  15 ++-
 .../tcm/transformations/RegisterTest.java          | 150 +++++++++++++++++++++
 .../cassandra/tcm/transformations/StartupTest.java | 112 +++++++++++++++
 12 files changed, 490 insertions(+), 81 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 12c457895a..701f7bf02c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Forbid nodes upgrading to a version which cannot read existing log entries 
(CASSANDRA-21174)
  * Introduce a check for minimum time to pass to train or import a compression 
dictionary from the last one (CASSANDRA-21179)
  * Allow overriding compaction strategy parameters during startup 
(CASSANDRA-21169)
  * Introduce created_at column to system_distributed.compression_dictionaries 
(CASSANDRA-21178)
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java 
b/src/java/org/apache/cassandra/tcm/transformations/Register.java
index 0b3ebb375f..1e31ff3a85 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Register.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java
@@ -74,6 +74,24 @@ public class Register implements Transformation
     @Override
     public Result execute(ClusterMetadata prev)
     {
+        // Ensure the joining node can read existing cluster metadata.
+        // Skip check for empty directory (first node in a new cluster).
+        if (!prev.directory.isEmpty())
+        {
+            Version clusterVersion = prev.directory.commonSerializationVersion;
+            Version newNodeVersion = version.serializationVersion();
+            if (newNodeVersion.isBefore(clusterVersion))
+            {
+                return new Rejected(INVALID,
+                                    String.format("Cannot register node: this 
node's metadata serialization version %s " +
+                                                  "is lower than the cluster's 
minimum required version %s. " +
+                                                  "Node would not be able to 
read cluster metadata. " +
+                                                  "Please upgrade the node to 
a Cassandra version that supports " +
+                                                  "metadata serialization 
version %s or higher before joining the cluster.",
+                                                  newNodeVersion, 
clusterVersion, clusterVersion));
+            }
+        }
+
         for (Map.Entry<NodeId, NodeAddresses> entry : 
prev.directory.addresses.entrySet())
         {
             NodeAddresses existingAddresses = entry.getValue();
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java 
b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
index a1abfdbff4..7b8c4cff7e 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
@@ -68,6 +68,21 @@ public class Startup implements Transformation
     @Override
     public Result execute(ClusterMetadata prev)
     {
+        // Prevent downgrade to a version that cannot read cluster metadata.
+        // This protects against restarting a node with an older binary.
+        Version clusterVersion = prev.directory.commonSerializationVersion;
+        Version newNodeVersion = nodeVersion.serializationVersion();
+        if (newNodeVersion.isBefore(clusterVersion))
+        {
+            return new Rejected(INVALID,
+                                String.format("Cannot start node: this node's 
metadata serialization version %s " +
+                                              "is lower than the cluster's 
minimum required version %s. " +
+                                              "Node would not be able to read 
cluster metadata. " +
+                                              "Please upgrade the node to a 
Cassandra version that supports " +
+                                              "metadata serialization version 
%s or higher before restarting.",
+                                              newNodeVersion, clusterVersion, 
clusterVersion));
+        }
+
         ClusterMetadata.Transformer next = prev.transformer();
         if (!prev.directory.addresses.get(nodeId).equals(addresses))
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
new file mode 100644
index 0000000000..84e461b6e4
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static 
org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION;
+
+public class IncompatibleMetadataSerializationVersionTest extends TestBaseImpl
+{
+    @Test
+    public void incompatibleVersionsCauseStartupFailureTest() throws Throwable
+    {
+        try (Cluster cluster = builder().withNodes(2)
+                                        .withInstanceInitializer(BB::install)
+                                        .createWithoutStarting())
+        {
+            cluster.get(1).startup();
+            // node1 has joined as normal so any entries committed to the 
metadata log will be serialized with
+            // NodeVersion.CURRENT_METADATA_VERSION. We will join node2, but 
the BB class used as an instanceInitializer
+            // will force it not to recognise this version. This simulates a 
node running an older, incompatible version
+            // attempting to join the cluster and should fail as the metadata 
log and snapshots it receives at startup
+            // are unreadable to it.
+            // We'll also set up the uncaught exceptions filter so that errors 
reported by node2 do not automatically
+            // trigger a failure, so that we can assert that the specific 
error we're expecting is thrown and logged.
+            cluster.setUncaughtExceptionsFilter((i, t) -> i != 2);
+            try
+            {
+                cluster.get(2).startup();
+                Assert.fail("Node2 startup should fail due to unsupported 
metadata versions");
+            }
+            catch (Exception e)
+            {
+                String expectedError = String.format("Unsupported metadata 
version \\(%s\\)", CURRENT_METADATA_VERSION.asInt());
+                Assert.assertFalse(cluster.get(2)
+                                          .logs()
+                                          .grep(expectedError)
+                                          .getResult()
+                                          .isEmpty());
+            }
+        }
+    }
+
+    public static class BB
+    {
+        static void install(ClassLoader cl, int node)
+        {
+            // only change behaviour of node2
+            if (node == 2)
+            {
+                new ByteBuddy().rebase(Version.class)
+                               .method(named("fromInt"))
+                               .intercept(MethodDelegation.to(BB.class))
+                               .make()
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+
+                new ByteBuddy().rebase(NodeVersion.class)
+                               .method(named("serializationVersion"))
+                               .intercept(MethodDelegation.to(BB.class))
+                               .make()
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        public static Version serializationVersion()
+        {
+            // This is called during node startup when initializing the 
LogState class and in particular its static
+            // defaultMessageSerializer field. We will emulate the behaviour 
of a node running an old version.
+            return Version.V0;
+        }
+
+        public static Version fromInt(int i)
+        {
+            // Behave as if the supplied version is invalid, unless it is the 
V0 value we are returning from the other
+            // intercepted method. This will cause any other version 
encountered, such as when receiving versioned log
+            // entries from another node, to appear unreadable.
+            if (i == Version.V0.asInt())
+                return Version.V0;
+
+            throw new IllegalArgumentException("Unsupported metadata version 
(" + i + ")");
+        }
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 393e9efd77..81006bbfc0 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -49,7 +49,6 @@ import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.Register;
-import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
 import org.apache.cassandra.tcm.transformations.Unregister;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -58,6 +57,8 @@ import static org.junit.Assert.assertEquals;
 
 public class RegisterTest extends TestBaseImpl
 {
+    private static final Location TEST_LOCATION = new Location("datacenter1", 
"rack1");
+
     @Test
     public void testRegistrationIdempotence() throws Throwable
     {
@@ -103,28 +104,28 @@ public class RegisterTest extends TestBaseImpl
         try (Cluster cluster = builder().withNodes(1)
                                         .createWithoutStarting())
         {
-            final String firstNodeEndpoint = "127.0.0.10";
             cluster.get(1).startup();
             cluster.get(1).runOnInstance(() -> {
                 try
                 {
-                    // Register a ghost node with V0 to fake-force V0 
serialization. In a real world cluster we will always be upgrading from a 
smaller version.
-                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
-                                                                          
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
+                    // Unregister to make directory empty
+                    ClusterMetadataService.instance().commit(new 
Unregister(ClusterMetadata.current().myNodeId(),
+                                                                            
EnumSet.allOf(NodeState.class),
+                                                                            
ClusterMetadataService.instance().placementProvider()));
+
+                    // Register a ghost node with V0 (bypasses version check 
because directory is now empty).
+                    // In a real world cluster we will always be upgrading 
from a smaller version.
+                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
+                                                                          
TEST_LOCATION,
                                                                           new 
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
-                    NodeId oldNode = 
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
-                    // Fake an upgrade of this node and assert we continue to 
serialize so that the one which only
-                    // supports V0 can deserialize. In a real cluster it 
wouldn't happen exactly in this way (here the
-                    // min serialization version actually goes backwards from 
CURRENT to V0 when we upgrade, which would
-                    // not happen in a real cluster as we would never register 
like oldNode, with the current C* version
-                    // but an older metadata version
+                    NodeId oldNode = 
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName("127.0.0.100"));
+
+                    // Register a node with upgraded version
                     CassandraVersion currentVersion = 
NodeVersion.CURRENT.cassandraVersion;
                     NodeVersion upgraded = new NodeVersion(new 
CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
                                                             
NodeVersion.CURRENT_METADATA_VERSION);
-                    ClusterMetadata metadata = ClusterMetadata.current();
-                    NodeId id = metadata.myNodeId();
-                    Startup startup = new Startup(id, 
metadata.directory.getNodeAddresses(id), upgraded);
-                    ClusterMetadataService.instance().commit(startup);
+                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.200")), TEST_LOCATION, 
upgraded));
+
                     // Doesn't matter which specific Transformation we use 
here, we're testing that the serializer uses
                     // the correct lower bound
                     Transformation t = new Register(NodeAddresses.current(), 
new Location("DC", "RACK"), NodeVersion.CURRENT);
@@ -173,9 +174,15 @@ public class RegisterTest extends TestBaseImpl
             cluster.get(1).runOnInstance(() -> {
                 try
                 {
-                    // Register a ghost node with V0 to fake-force V0 
serialization. In a real world cluster we will always be upgrading from a 
smaller version.
-                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
-                                                                          
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
+                    // Unregister to make directory empty
+                    ClusterMetadataService.instance().commit(new 
Unregister(ClusterMetadata.current().myNodeId(),
+                                                                            
EnumSet.allOf(NodeState.class),
+                                                                            
ClusterMetadataService.instance().placementProvider()));
+
+                    // Register a ghost node with V0 (bypasses version check 
because directory is now empty).
+                    // In a real world cluster we will always be upgrading 
from a smaller version.
+                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
+                                                                          
TEST_LOCATION,
                                                                           new 
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
                 }
                 catch (UnknownHostException e)
@@ -187,9 +194,6 @@ public class RegisterTest extends TestBaseImpl
                 ClusterMetadata cm = new 
MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch);
                 cm.equals(ClusterMetadata.current());
             });
-
-
         }
     }
-
 }
diff --git a/test/unit/org/apache/cassandra/Util.java 
b/test/unit/org/apache/cassandra/Util.java
index 9f7a12a854..1bea5d4d4b 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -151,13 +151,8 @@ import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.streaming.StreamResultFuture;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.membership.NodeAddresses;
-import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
@@ -1205,13 +1200,6 @@ public class Util
             assertEquals(0, ((SSTableReaderWithFilter) 
reader).getFilterOffHeapSize());
     }
 
-    public static void setUpgradeFromVersion(String version)
-    {
-        InetAddressAndPort ep = 
InetAddressAndPort.getByNameUnchecked("127.0.0.10");
-        Register.register(new NodeAddresses(ep),
-                          new NodeVersion(new CassandraVersion(version), 
Version.OLD));
-    }
-
     /**
      * Sets the length of the file to given size. File will be created if not 
exist.
      *
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
index a2e4be9b91..695d5d7410 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
@@ -22,17 +22,16 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.tcm.membership.NodeVersion;
 
 /* InsertUpdateIfConditionCollectionsTest class has been split into multiple 
ones because of timeout issues (CASSANDRA-16670)
  * Any changes here check if they apply to the other classes
@@ -46,18 +45,16 @@ public class InsertUpdateIfConditionCollectionsTest extends 
CQLTester
     @Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
     public static Collection<Object[]> data()
     {
-        ServerTestUtils.daemonInitialization();
-
         return InsertUpdateIfConditionTest.data();
     }
 
     @Parameterized.Parameter(0)
-    public String clusterMinVersion;
+    public NodeVersion clusterMinVersion;
 
     @BeforeClass
-    public static void beforeClass()
+    public static void setUpClass()
     {
-        InsertUpdateIfConditionTest.beforeClass();
+        InsertUpdateIfConditionTest.setUpClass();
     }
 
     @Before
@@ -66,12 +63,6 @@ public class InsertUpdateIfConditionCollectionsTest extends 
CQLTester
         InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
     }
 
-    @AfterClass
-    public static void afterClass()
-    {
-        InsertUpdateIfConditionTest.afterClass();
-    }
-
     /**
      * Migrated from cql_tests.py:TestCQL.bug_6069_test()
      */
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
index 7b670956a0..25d1387050 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.cql3.validation.operations;
 
 import java.util.Collection;
 
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -29,6 +28,7 @@ import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.tcm.membership.NodeVersion;
 
 /* InsertUpdateIfConditionCollectionsTest class has been split into multiple 
ones because of timeout issues (CASSANDRA-16670)
  * Any changes here check if they apply to the other classes
@@ -47,12 +47,12 @@ public class InsertUpdateIfConditionStaticsTest extends 
CQLTester
     }
 
     @Parameterized.Parameter(0)
-    public String clusterMinVersion;
+    public NodeVersion clusterMinVersion;
 
     @BeforeClass
-    public static void beforeClass()
+    public static void setUpClass()
     {
-        InsertUpdateIfConditionTest.beforeClass();
+        InsertUpdateIfConditionTest.setUpClass();
     }
 
     @Before
@@ -61,12 +61,6 @@ public class InsertUpdateIfConditionStaticsTest extends 
CQLTester
         InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
     }
 
-    @AfterClass
-    public static void afterClass()
-    {
-        InsertUpdateIfConditionTest.afterClass();
-    }
-
     /**
      * Migrated from cql_tests.py:TestCQL.static_columns_cas_test()
      */
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 750141674a..68663d3f85 100644
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.cql3.validation.operations;
 import java.util.Arrays;
 import java.util.Collection;
 
-import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -29,12 +28,20 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import org.apache.cassandra.ServerTestUtils;
-import org.apache.cassandra.Util;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.Duration;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaKeyspaceTables;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.transformations.Register;
+import org.apache.cassandra.utils.CassandraVersion;
 
 import static java.lang.String.format;
 import static org.junit.Assert.assertEquals;
@@ -53,16 +60,23 @@ public class InsertUpdateIfConditionTest extends CQLTester
     {
         ServerTestUtils.daemonInitialization();
         // TODO [tcm] we will require upgrading from 4.1
-        return Arrays.asList(new Object[]{ "4.1" }, new Object[]{ "4.0" });
+        return Arrays.asList(new Object[]{ NodeVersion.CURRENT },
+                             new Object[]{ new NodeVersion( new 
CassandraVersion("4.1"), Version.OLD) },
+                             new Object[]{ new NodeVersion( new 
CassandraVersion("4.0"), Version.OLD) });
     }
 
     @Parameterized.Parameter(0)
-    public String clusterMinVersion;
+    public NodeVersion clusterMinVersion;
 
     @BeforeClass
-    public static void beforeClass()
+    public static void setUpClass()
     {
-        Gossiper.instance.start(0);
+        // This intentionally shadows CQLTester::setUpClass, in order to 
initialize the ClusterMetadataService
+        // without automatically registering the first node. This is so the 
directory can be setup to mimic a
+        // mid-upgrade cluster with nodes on both old and new versions.
+        prePrepareServer();
+        ServerTestUtils.prepareServerNoRegister();
+        ServerTestUtils.markCMS();  // CQLTester::afterTest will reset the CMS 
& ClusterMetadata to this state
     }
 
     @Before
@@ -70,21 +84,27 @@ public class InsertUpdateIfConditionTest extends CQLTester
     {
         beforeSetup(clusterMinVersion);
     }
-    
-    public static void beforeSetup(String clusterMinVersion)
-    {
-        // setUpgradeFromVersion adds node2 to the Gossiper. On slow CI envs 
the Gossiper might auto-remove it after some
-        // timeout if it thinks it's a fat client making the test fail. Just 
retry C18393.
-        Util.spinAssertEquals(Boolean.TRUE, () -> {
-            Util.setUpgradeFromVersion(clusterMinVersion);
-            return true;
-        }, 5);
-    }
 
-    @AfterClass
-    public static void afterClass()
+    public static void beforeSetup(NodeVersion clusterMinVersion)
     {
-        Gossiper.instance.stop();
+        // Add two entries to ClusterMetadata, to make it potentially appear 
as a mixed-version cluster (if the
+        // supplied version is lower than current).
+        ClusterMetadataService.instance()
+                              .commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.10")),
+                                                   new Location("dc1", 
"rack1"),
+                                                   clusterMinVersion));
+        ClusterMetadataService.instance()
+                              .commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.20")),
+                                                   new Location("dc1", 
"rack1"),
+                                                   NodeVersion.CURRENT));
+
+        Directory directory = ClusterMetadata.current().directory;
+        assertEquals(directory.clusterMinVersion, clusterMinVersion);
+        assertEquals(NodeVersion.CURRENT, directory.clusterMaxVersion);
+        // Version.OLD does not influence commonSerializationVersion as 
un-upgraded nodes are completely outside
+        // the scope of maintaining backward compatible metadata 
serializations (i.e. they predate them entirely). So
+        // in this test, the common serialization version will always be the 
current one.
+        assertEquals(NodeVersion.CURRENT_METADATA_VERSION, 
ClusterMetadata.current().directory.commonSerializationVersion);
     }
 
     /**
diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java 
b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
index ac3a278cf2..56af302c23 100644
--- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
+++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
@@ -34,17 +34,17 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.SchemaTransformation;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.ownership.DataPlacement;
-import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
 import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 import org.apache.cassandra.tcm.sequences.LockedRanges;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
 import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.AlterSchema;
-import org.apache.cassandra.tcm.transformations.Assassinate;
 import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.utils.CassandraVersion;
 
 import static 
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr;
@@ -137,11 +137,15 @@ public class ClusterMetadataTest
     private static void newTransformationHelper(Transformation transformation)
     {
         NodeId v4Node = null;
+        NodeAddresses v4Addresses = null;
         for (int i = 1; i <= 4; i++)
         {
-            NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0", 
"rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 4 ? Version.V4 : 
Version.V5));
-            if (i == 4)
+            NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0", 
"rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 1 ? Version.V4 : 
Version.V5));
+            if (i == 1)
+            {
                 v4Node = nodeId;
+                v4Addresses = new NodeAddresses(addr(i));
+            }
             ClusterMetadataTestHelper.join(i, i);
         }
 
@@ -154,7 +158,8 @@ public class ClusterMetadataTest
         {
             assertTrue(e.getMessage().contains("Transformation rejected"));
         }
-        ClusterMetadataService.instance().commit(new Assassinate(v4Node, new 
UniformRangePlacement()));
+        // "upgrade" v4Node and the transformation should become committable
+        ClusterMetadataService.instance().commit(new Startup(v4Node, 
v4Addresses, new NodeVersion(CassandraVersion.CASSANDRA_5_0, Version.V5)));
         ClusterMetadataService.instance().commit(transformation);
     }
 
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java 
b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java
new file mode 100644
index 0000000000..3b385b01b4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RegisterTest
+{
+    private static final Location LOCATION = new Location("dc", "rack");
+
+    /**
+     * Tests that registering a new node with a serialization version lower 
than the cluster's
+     * commonSerializationVersion is rejected.
+     */
+    @Test
+    public void rejectsLowerSerializationVersion() throws UnknownHostException
+    {
+        NodeId existingNode = new NodeId(1);
+
+        Directory directory = Directory.EMPTY
+                              .unsafeWithNodeForTesting(existingNode,
+                                                        new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+                                                        LOCATION,
+                                                        NodeVersion.CURRENT)
+                              .withNodeState(existingNode, NodeState.JOINED);
+
+        ClusterMetadata metadata = 
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+                                                            .transformer()
+                                                            .with(directory)
+                                                            .build().metadata;
+
+        assertEquals("commonSerializationVersion should be 
CURRENT_METADATA_VERSION", NodeVersion.CURRENT_METADATA_VERSION, 
metadata.directory.commonSerializationVersion);
+
+        // Try to register a new node with V3 (lower than cluster's current 
version)
+        NodeVersion lowerVersion = new 
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3);
+        Register register = new Register(
+            new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")),
+            LOCATION,
+            lowerVersion
+        );
+
+        Transformation.Result result = register.execute(metadata);
+
+        assertTrue("Registration should be rejected for node with lower 
serialization version", result.isRejected());
+        assertEquals(ExceptionCode.INVALID, result.rejected().code);
+    }
+
+    /**
+     * Tests that registering nodes with serialization version equal to or 
higher than
+     * the cluster's commonSerializationVersion is allowed.
+     */
+    @Test
+    public void allowsEqualOrHigherSerializationVersion() throws 
UnknownHostException
+    {
+        NodeId existingNode = new NodeId(1);
+        NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, 
Version.V3);
+
+        Directory directory = Directory.EMPTY
+                              .unsafeWithNodeForTesting(existingNode,
+                                                        new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+                                                        LOCATION,
+                                                        v3)
+                              .withNodeState(existingNode, NodeState.JOINED);
+
+        ClusterMetadata metadata = 
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+                                                            .transformer()
+                                                            .with(directory)
+                                                            .build().metadata;
+
+        assertEquals("commonSerializationVersion should be V3", Version.V3, 
metadata.directory.commonSerializationVersion);
+
+        // Register a node with higher version - should succeed
+        Register registerHigher = new Register(
+            new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")),
+            LOCATION,
+            NodeVersion.CURRENT
+        );
+
+        Transformation.Result resultHigher = registerHigher.execute(metadata);
+        assertTrue("Registration should succeed for node with higher 
serialization version", resultHigher.isSuccess());
+
+        // Register a node with equal version - should succeed
+        Register registerEqual = new Register(
+            new NodeAddresses(InetAddressAndPort.getByName("127.0.0.3")),
+            LOCATION,
+            v3
+        );
+
+        Transformation.Result resultEqual = registerEqual.execute(metadata);
+        assertTrue("Registration should succeed for node with equal 
serialization version", resultEqual.isSuccess());
+    }
+
+    /**
+     * Tests that the first node in an empty cluster can register with any 
version
+     * (bypasses version check because directory is empty).
+     */
+    @Test
+    public void allowsAnyVersionForFirstNode() throws UnknownHostException
+    {
+        ClusterMetadata metadata = 
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance);
+
+        assertTrue("Directory should be empty", metadata.directory.isEmpty());
+
+        // Register first node with V0 - should succeed because directory is 
empty
+        NodeVersion v0 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, 
Version.V0);
+        Register register = new Register(
+            new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+            LOCATION,
+            v0
+        );
+
+        Transformation.Result result = register.execute(metadata);
+        assertTrue("First node registration should succeed with any version", 
result.isSuccess());
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java 
b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java
new file mode 100644
index 0000000000..8e0c165a22
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StartupTest
+{
+    private static final Location LOCATION = new Location("dc", "rack");
+
+    /**
+     * Tests that the Startup transformation rejects downgrading a node to a 
version
+     * that cannot read cluster metadata.
+     */
+    @Test
+    public void rejectsDowngrade() throws UnknownHostException
+    {
+        NodeId nodeId = new NodeId(1);
+        NodeAddresses addresses = new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1"));
+
+        Directory directory = Directory.EMPTY
+                              .unsafeWithNodeForTesting(nodeId, addresses, 
LOCATION, NodeVersion.CURRENT)
+                              .withNodeState(nodeId, NodeState.JOINED);
+
+        ClusterMetadata metadata = 
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+                                                            .transformer()
+                                                            .with(directory)
+                                                            .build().metadata;
+
+        assertEquals("commonSerializationVersion should be 
CURRENT_METADATA_VERSION",
+                     NodeVersion.CURRENT_METADATA_VERSION, 
metadata.directory.commonSerializationVersion);
+
+        // Try to "downgrade" the node to V3 (simulating restart with older 
binary)
+        NodeVersion downgradedVersion = new 
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3);
+        Startup startup = new Startup(nodeId, addresses, downgradedVersion);
+
+        Transformation.Result result = startup.execute(metadata);
+
+        assertTrue("Startup should be rejected for downgrade to lower 
serialization version", result.isRejected());
+        assertEquals(ExceptionCode.INVALID, result.rejected().code);
+    }
+
+    /**
+     * Tests that the Startup transformation allows a node to restart with 
equal or higher
+     * serialization version.
+     */
+    @Test
+    public void allowsEqualOrHigherSerializationVersion() throws 
UnknownHostException
+    {
+        NodeId nodeId = new NodeId(1);
+        NodeAddresses addresses = new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1"));
+        NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion, 
Version.V3);
+
+        Directory directory = Directory.EMPTY
+                              .unsafeWithNodeForTesting(nodeId, addresses, 
LOCATION, v3)
+                              .withNodeState(nodeId, NodeState.JOINED);
+
+        ClusterMetadata metadata = 
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+                                                            .transformer()
+                                                            .with(directory)
+                                                            .build().metadata;
+
+        assertEquals("commonSerializationVersion should be V3", Version.V3, 
metadata.directory.commonSerializationVersion);
+
+        // Startup with higher version - should succeed
+        Startup startupHigher = new Startup(nodeId, addresses, 
NodeVersion.CURRENT);
+
+        Transformation.Result resultHigher = startupHigher.execute(metadata);
+        assertTrue("Startup should succeed for higher serialization version", 
resultHigher.isSuccess());
+
+        // Startup with equal version - should succeed
+        Startup startupEqual = new Startup(nodeId, addresses, v3);
+
+        Transformation.Result resultEqual = startupEqual.execute(metadata);
+        assertTrue("Startup should succeed for equal serialization version", 
resultEqual.isSuccess());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to