This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 11358bd3a8 Rewrite RegisterTest to verify serialization version ceiling
11358bd3a8 is described below

commit 11358bd3a84550871e1a49dea8c79304733859de
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Nov 30 19:56:34 2023 +0000

    Rewrite RegisterTest to verify serialization version ceiling
    
    Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-19073
---
 .../distributed/test/log/RegisterTest.java         | 84 ++++++++++++++--------
 1 file changed, 55 insertions(+), 29 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 8cfa95da27..466a660273 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -18,9 +18,9 @@
 
 package org.apache.cassandra.distributed.test.log;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
+import java.io.IOException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
@@ -32,11 +32,15 @@ import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Location;
 import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.sequences.LeaveStreams;
 import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
@@ -44,11 +48,12 @@ import org.apache.cassandra.tcm.serialization.Version;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.Register;
 import org.apache.cassandra.tcm.transformations.SealPeriod;
+import org.apache.cassandra.tcm.transformations.Startup;
 import org.apache.cassandra.tcm.transformations.Unregister;
 import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES;
+import static org.junit.Assert.assertEquals;
 
 public class RegisterTest extends TestBaseImpl
 {
@@ -91,49 +96,70 @@ public class RegisterTest extends TestBaseImpl
     }
 
     @Test
-    public void serializationVersionDisagreementTest() throws Throwable
+    public void serializationVersionCeilingTest() throws Throwable
     {
-        try (Cluster cluster = builder().withNodes(2)
+        try (Cluster cluster = builder().withNodes(1)
                                         .createWithoutStarting();
              WithProperties prop = new 
WithProperties().set(TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES, "true"))
         {
+            final String firstNodeEndpoint = "127.0.0.10";
             cluster.get(1).startup();
             cluster.get(1).runOnInstance(() -> {
                 try
                 {
                     // Register a ghost node with V0 to fake-force V0 
serialization. In a real world cluster we will always be upgrading from a 
smaller version.
-                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
+                    ClusterMetadataService.instance().commit(new Register(new 
NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
                                                                           
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
                                                                           new 
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-                ClusterMetadataService.instance().commit(SealPeriod.instance);
-            });
-
-            cluster.get(2).runOnInstance(() -> {
-                try
-                {
-                    Field field = 
NodeVersion.class.getDeclaredField("CURRENT");
-                    Field modifiers = 
Field.class.getDeclaredField("modifiers");
-
-                    field.setAccessible(true);
-                    modifiers.setAccessible(true);
-
-                    int newModifiers = field.getModifiers() & ~Modifier.FINAL;
-                    modifiers.setInt(field, newModifiers);
-                    field.set(null, new NodeVersion(new 
CassandraVersion(FBUtilities.getReleaseVersionString()), 
NodeVersion.CURRENT_METADATA_VERSION));
+                    NodeId oldNode = 
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
+                    // Fake an upgrade of this node and assert we continue to 
serialize so that the one which only
+                    // supports V0 can deserialize. In a real cluster it 
wouldn't happen exactly in this way (here the
+                    // min serialization version actually goes backwards from 
CURRENT to V0 when we upgrade, which would
+                    // not happen in a real cluster as we would never register 
like oldNode, with the current C* version
+                    // but an older metadata version
+                    CassandraVersion currentVersion = 
NodeVersion.CURRENT.cassandraVersion;
+                    NodeVersion upgraded = new NodeVersion(new 
CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
+                                                            
NodeVersion.CURRENT_METADATA_VERSION);
+                    ClusterMetadata metadata = ClusterMetadata.current();
+                    NodeId id = metadata.myNodeId();
+                    Startup startup = new Startup(id, 
metadata.directory.getNodeAddresses(id), upgraded);
+                    ClusterMetadataService.instance().commit(startup);
+                    // Doesn't matter which specific Transformation we use 
here, we're testing that the serializer uses
+                    // the correct lower bound
+                    Transformation t = new Register(NodeAddresses.current(), 
new Location("DC", "RACK"), NodeVersion.CURRENT);
+                    try
+                    {
+                        
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
+                                     Version.V0.asInt());
+                        ByteBuffer bytes = t.kind().toVersionedBytes(t);
+                        try (DataInputBuffer buf = new DataInputBuffer(bytes, 
true))
+                        {
+                            // Because ClusterMetadata.current().directory 
still contains oldNode we must serialize at
+                            // the version it supports
+                            assertEquals(Version.V0, 
Version.fromInt(buf.readUnsignedVInt32()));
+                        }
+
+                        // If we unregister oldNode, then the ceiling for 
serialization version will rise
+                        ClusterMetadataService.instance().commit(new 
Unregister(oldNode));
+                        
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
+                                     
NodeVersion.CURRENT_METADATA_VERSION.asInt());
+                        bytes = t.kind().toVersionedBytes(t);
+                        try (DataInputBuffer buf = new DataInputBuffer(bytes, 
true))
+                        {
+                            assertEquals(NodeVersion.CURRENT_METADATA_VERSION, 
Version.fromInt(buf.readUnsignedVInt32()));
+                        }
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
 
                 }
-                catch (NoSuchFieldException | IllegalAccessException e)
+                catch (UnknownHostException e)
                 {
                     throw new RuntimeException(e);
                 }
             });
-
-            cluster.get(2).startup();
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to