This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 46b90364da Change IP address of the CMS node during transition
46b90364da is described below
commit 46b90364daecf1880db5eda9899d7353ad81f445
Author: Alex Petrov <[email protected]>
AuthorDate: Thu Dec 21 13:47:22 2023 +0100
Change IP address of the CMS node during transition
Patch by Alex Petrov; reviewed by Sam Tunnicliffe and Marcus Eriksson for
CASSANDRA-19219
---
.../cassandra/locator/CMSPlacementStrategy.java | 4 --
.../cassandra/tcm/transformations/Startup.java | 20 +++++++
.../distributed/test/cms/CMSAddressChangeTest.java | 67 ++++++++++++++++++++++
.../test/log/ClusterMetadataTestHelper.java | 13 +++++
4 files changed, 100 insertions(+), 4 deletions(-)
diff --git a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
index ea4f0cbb9a..754687a199 100644
--- a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
+++ b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
@@ -28,7 +28,6 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Transformation;
@@ -137,9 +136,6 @@ public interface CMSPlacementStrategy
public Boolean apply(ClusterMetadata metadata, NodeId nodeId)
{
- if
(!FailureDetector.instance.isAlive(metadata.directory.endpoint(nodeId)))
- return false;
-
if (metadata.directory.peerState(nodeId) != NodeState.JOINED)
return false;
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
index b26cc3655c..b4d4007e43 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
@@ -24,7 +24,10 @@ import java.util.Objects;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Transformation;
@@ -32,12 +35,14 @@ import org.apache.cassandra.tcm.membership.Directory;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.serialization.MetadataSerializer;
import org.apache.cassandra.tcm.serialization.Version;
import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
+import static org.apache.cassandra.tcm.ownership.EntireRange.entireRange;
public class Startup implements Transformation
{
@@ -87,6 +92,21 @@ public class Startup implements Transformation
next.build().metadata,
allKeyspaces);
+ if (prev.isCMSMember(prev.directory.endpoint(nodeId)))
+ {
+ ReplicationParams metaParams = ReplicationParams.meta(prev);
+ InetAddressAndPort endpoint = prev.directory.endpoint(nodeId);
+ Replica leavingReplica = new Replica(endpoint, entireRange,
true);
+ Replica joiningReplica = new
Replica(addresses.broadcastAddress, entireRange, true);
+
+ DataPlacement.Builder builder =
prev.placements.get(metaParams).unbuild();
+ builder.reads.withoutReplica(prev.nextEpoch(), leavingReplica);
+ builder.writes.withoutReplica(prev.nextEpoch(),
leavingReplica);
+ builder.reads.withReplica(prev.nextEpoch(), joiningReplica);
+ builder.writes.withReplica(prev.nextEpoch(), joiningReplica);
+ newPlacement = newPlacement.unbuild().with(metaParams,
builder.build()).build();
+ }
+
next = next.with(newPlacement);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cms/CMSAddressChangeTest.java
b/test/distributed/org/apache/cassandra/distributed/test/cms/CMSAddressChangeTest.java
new file mode 100644
index 0000000000..3f6c45c637
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/cms/CMSAddressChangeTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.cms;
+
+import java.util.Collections;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.CMSTestBase;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.transformations.Startup;
+
+public class CMSAddressChangeTest extends CMSTestBase
+{
+ @Test
+ public void testCMSAddressChange()
+ {
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ Keyspace.setInitialized();
+ ClusterMetadataTestHelper.setInstanceForTest();
+ for (int i = 1; i < 10; i++)
+ {
+ ClusterMetadataTestHelper.register(i);
+ ClusterMetadataTestHelper.join(i, i);
+ }
+
+
ClusterMetadataTestHelper.reconfigureCms(ReplicationParams.ntsMeta(Collections.singletonMap("dc0",
3)));
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ InetAddressAndPort oldAddr =
metadata.fullCMSMembers().iterator().next();
+ InetAddressAndPort newAddr = ClusterMetadataTestHelper.addr(100);
+ NodeId cmsMemberNodeId = metadata.directory.peerId(oldAddr);
+
+ metadata = ClusterMetadataService.instance().commit(new
Startup(cmsMemberNodeId,
+ new
NodeAddresses(newAddr),
+
metadata.directory.version(cmsMemberNodeId)));
+
+ Assert.assertFalse(metadata.fullCMSMembers().contains(oldAddr));
+ Assert.assertTrue(metadata.fullCMSMembers().contains(newAddr));
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index 5af25a18b5..0046e170c8 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.DistributedSchema;
import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaTransformation;
import org.apache.cassandra.service.ClientState;
@@ -74,6 +75,7 @@ import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.Move;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
+import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.transformations.AlterSchema;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
@@ -81,6 +83,8 @@ import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareMove;
import org.apache.cassandra.tcm.transformations.PrepareReplace;
import org.apache.cassandra.tcm.transformations.Register;
+import org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration;
+import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.tcm.transformations.cms.Initialize;
import org.apache.cassandra.utils.FBUtilities;
@@ -835,6 +839,15 @@ public class ClusterMetadataTestHelper
.finishLeave();
}
+ public static void reconfigureCms(ReplicationParams replication)
+ {
+ ClusterMetadata metadata =
ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(replication));
+ while
(metadata.inProgressSequences.contains(ReconfigureCMS.SequenceKey.instance))
+ {
+ AdvanceCMSReconfiguration next = ((ReconfigureCMS)
metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)).next;
+ metadata = ClusterMetadataService.instance().commit(next);
+ }
+ }
public static void addOrUpdateKeyspace(KeyspaceMetadata keyspace)
{
try
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]