This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 11358bd3a8 Rewrite RegisterTest to verify serialization version ceiling
11358bd3a8 is described below
commit 11358bd3a84550871e1a49dea8c79304733859de
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Nov 30 19:56:34 2023 +0000
Rewrite RegisterTest to verify serialization version ceiling
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for CASSANDRA-19073
---
.../distributed/test/log/RegisterTest.java | 84 ++++++++++++++--------
1 file changed, 55 insertions(+), 29 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 8cfa95da27..466a660273 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -18,9 +18,9 @@
package org.apache.cassandra.distributed.test.log;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
+import java.io.IOException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import org.junit.Test;
@@ -32,11 +32,15 @@ import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
@@ -44,11 +48,12 @@ import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.SealPeriod;
+import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.Unregister;
import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.FBUtilities;
import static
org.apache.cassandra.config.CassandraRelevantProperties.TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES;
+import static org.junit.Assert.assertEquals;
public class RegisterTest extends TestBaseImpl
{
@@ -91,49 +96,70 @@ public class RegisterTest extends TestBaseImpl
}
@Test
- public void serializationVersionDisagreementTest() throws Throwable
+ public void serializationVersionCeilingTest() throws Throwable
{
- try (Cluster cluster = builder().withNodes(2)
+ try (Cluster cluster = builder().withNodes(1)
.createWithoutStarting();
WithProperties prop = new
WithProperties().set(TCM_ALLOW_TRANSFORMATIONS_DURING_UPGRADES, "true"))
{
+ final String firstNodeEndpoint = "127.0.0.10";
cluster.get(1).startup();
cluster.get(1).runOnInstance(() -> {
try
{
// Register a ghost node with V0 to fake-force V0
serialization. In a real world cluster we will always be upgrading from a
smaller version.
- ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
+ ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
new
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
- }
- catch (UnknownHostException e)
- {
- throw new RuntimeException(e);
- }
- ClusterMetadataService.instance().commit(SealPeriod.instance);
- });
-
- cluster.get(2).runOnInstance(() -> {
- try
- {
- Field field =
NodeVersion.class.getDeclaredField("CURRENT");
- Field modifiers =
Field.class.getDeclaredField("modifiers");
-
- field.setAccessible(true);
- modifiers.setAccessible(true);
-
- int newModifiers = field.getModifiers() & ~Modifier.FINAL;
- modifiers.setInt(field, newModifiers);
- field.set(null, new NodeVersion(new
CassandraVersion(FBUtilities.getReleaseVersionString()),
NodeVersion.CURRENT_METADATA_VERSION));
+ NodeId oldNode =
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
+ // Fake an upgrade of this node and assert we continue to
serialize so that the one which only
+ // supports V0 can deserialize. In a real cluster it
wouldn't happen exactly in this way (here the
+ // min serialization version actually goes backwards from
CURRENT to V0 when we upgrade, which would
+ // not happen in a real cluster as we would never register
like oldNode, with the current C* version
+ // but an older metadata version
+ CassandraVersion currentVersion =
NodeVersion.CURRENT.cassandraVersion;
+ NodeVersion upgraded = new NodeVersion(new
CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
+
NodeVersion.CURRENT_METADATA_VERSION);
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId id = metadata.myNodeId();
+ Startup startup = new Startup(id,
metadata.directory.getNodeAddresses(id), upgraded);
+ ClusterMetadataService.instance().commit(startup);
+ // Doesn't matter which specific Transformation we use
here, we're testing that the serializer uses
+ // the correct lower bound
+ Transformation t = new Register(NodeAddresses.current(),
new Location("DC", "RACK"), NodeVersion.CURRENT);
+ try
+ {
+
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
+ Version.V0.asInt());
+ ByteBuffer bytes = t.kind().toVersionedBytes(t);
+ try (DataInputBuffer buf = new DataInputBuffer(bytes,
true))
+ {
+ // Because ClusterMetadata.current().directory
still contains oldNode we must serialize at
+ // the version it supports
+ assertEquals(Version.V0,
Version.fromInt(buf.readUnsignedVInt32()));
+ }
+
+ // If we unregister oldNode, then the ceiling for
serialization version will rise
+ ClusterMetadataService.instance().commit(new
Unregister(oldNode));
+
assertEquals(ClusterMetadata.current().directory.clusterMinVersion.serializationVersion,
+
NodeVersion.CURRENT_METADATA_VERSION.asInt());
+ bytes = t.kind().toVersionedBytes(t);
+ try (DataInputBuffer buf = new DataInputBuffer(bytes,
true))
+ {
+ assertEquals(NodeVersion.CURRENT_METADATA_VERSION,
Version.fromInt(buf.readUnsignedVInt32()));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
- catch (NoSuchFieldException | IllegalAccessException e)
+ catch (UnknownHostException e)
{
throw new RuntimeException(e);
}
});
-
- cluster.get(2).startup();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]