This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit cdfce6b4ac5b2d3a6106001f4d6eb9234bbaa300 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Wed Sep 11 13:17:26 2024 +0200 Allow nodes to change IP address while upgrading to TCM Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-19921 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/tcm/Epoch.java | 4 +- src/java/org/apache/cassandra/tcm/Startup.java | 32 +++++++++- .../cassandra/tcm/compatibility/GossipHelper.java | 11 ++-- .../org/apache/cassandra/tcm/log/LogState.java | 9 +++ .../apache/cassandra/tcm/membership/Directory.java | 12 ++++ .../apache/cassandra/tcm/migration/Election.java | 2 + .../cassandra/tcm/migration/GossipCMSListener.java | 51 ++++++++++++--- .../tcm/transformations/cms/PreInitialize.java | 6 +- .../distributed/impl/AbstractCluster.java | 7 ++- .../ClusterMetadataUpgradeChangeIPTest.java | 73 ++++++++++++++++++++++ 11 files changed, 188 insertions(+), 20 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 37797aa0e2..a56ec716dd 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Allow nodes to change IP address while upgrading to TCM (CASSANDRA-19921) * Retain existing keyspace params on system tables after upgrade (CASSANDRA-19916) * Deprecate use of gossip state for paxos electorate verification (CASSANDRA-19904) * Update dtest-api to 0.0.17 to fix jvm17 crash in jvm-dtests (CASSANDRA-19239) diff --git a/src/java/org/apache/cassandra/tcm/Epoch.java b/src/java/org/apache/cassandra/tcm/Epoch.java index 1b7c33a895..0d070b4a5b 100644 --- a/src/java/org/apache/cassandra/tcm/Epoch.java +++ b/src/java/org/apache/cassandra/tcm/Epoch.java @@ -101,7 +101,9 @@ public class Epoch implements Comparable<Epoch>, Serializable public Epoch nextEpoch() { - if (beforeFirst.contains(this)) + if (this == UPGRADE_GOSSIP || this == UPGRADE_STARTUP) + return this; + if (this == EMPTY) return FIRST; return new Epoch(epoch + 1); diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 023fcdba77..459566ff80 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -41,10 +41,12 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.dht.BootStrapper; import org.apache.cassandra.exceptions.StartupException; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.NewGossiper; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -281,12 +283,40 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; logger.debug("Starting to initialize ClusterMetadata from gossip"); Map<InetAddressAndPort, EndpointState> epStates = NewGossiper.instance.doShadowRound(); + InetAddressAndPort switchIp = null; + if (!epStates.containsKey(getBroadcastAddressAndPort())) + { + UUID hostId = SystemKeyspace.getLocalHostId(); + for (Map.Entry<InetAddressAndPort, EndpointState> epstate : epStates.entrySet()) + { + EndpointState state = epstate.getValue(); + VersionedValue gossipHostId = state.getApplicationState(ApplicationState.HOST_ID); + if (gossipHostId != null && UUID.fromString(gossipHostId.value).equals(hostId)) + { + switchIp = epstate.getKey(); + break; + } + } + if (switchIp != null) + { + logger.info("Changing IP in gossip mode from {} to {}", switchIp, getBroadcastAddressAndPort()); + // we simply switch the key to the new ip here to make sure we grab NodeAddresses.current() for + // this node when constructing the initial ClusterMetadata in GossipHelper#getAddressesFromEndpointState + epStates.put(getBroadcastAddressAndPort(), epStates.remove(switchIp)); + } + } + logger.debug("Got epStates {}", epStates); ClusterMetadata initial = fromEndpointStates(emptyFromSystemTables.schema, epStates); logger.debug("Created initial ClusterMetadata {}", initial); - SystemKeyspace.setLocalHostId(initial.myNodeId().toUUID()); ClusterMetadataService.instance().setFromGossip(initial); Gossiper.instance.clearUnsafe(); + if (switchIp != null) + { + // quarantine the old ip to make sure it doesn't get re-added via gossip + InetAddressAndPort removeEp = switchIp; + Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.removeEndpoint(removeEp)); + } Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); for (Map.Entry<NodeId, NodeState> entry : initial.directory.states.entrySet()) Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial); diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 048bb99ca1..8f4861cc17 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -229,7 +229,7 @@ public class GossipHelper throw new IllegalStateException("Can't upgrade the first node when STATUS = " + status + " for node " + endpoint); } - private static NodeAddresses getAddressesFromEndpointState(InetAddressAndPort endpoint, EndpointState epState) + public static NodeAddresses getAddressesFromEndpointState(InetAddressAndPort endpoint, EndpointState epState) { if (endpoint.equals(getBroadcastAddressAndPort())) return NodeAddresses.current(); @@ -322,8 +322,8 @@ public class GossipHelper @VisibleForTesting public static ClusterMetadata fromEndpointStates(Map<InetAddressAndPort, EndpointState> epStates, IPartitioner partitioner, DistributedSchema schema) { - Directory directory = new Directory(); - TokenMap tokenMap = new TokenMap(partitioner); + Directory directory = new Directory().withLastModified(Epoch.UPGRADE_GOSSIP); + TokenMap tokenMap = new TokenMap(partitioner).withLastModified(Epoch.UPGRADE_GOSSIP); List<InetAddressAndPort> sortedEps = Lists.newArrayList(epStates.keySet()); Collections.sort(sortedEps); Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(); @@ -354,12 +354,15 @@ public class GossipHelper LockedRanges.EMPTY, InProgressSequences.EMPTY, extensions); + DataPlacements placements = new UniformRangePlacement().calculatePlacements(Epoch.UPGRADE_GOSSIP, + forPlacementCalculation, + schema.getKeyspaces()); return new ClusterMetadata(Epoch.UPGRADE_GOSSIP, partitioner, schema, directory, tokenMap, - new UniformRangePlacement().calculatePlacements(Epoch.UPGRADE_GOSSIP, forPlacementCalculation, schema.getKeyspaces()), + placements, LockedRanges.EMPTY, InProgressSequences.EMPTY, extensions); diff --git a/src/java/org/apache/cassandra/tcm/log/LogState.java b/src/java/org/apache/cassandra/tcm/log/LogState.java index de69184a89..03294e9ffb 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogState.java +++ b/src/java/org/apache/cassandra/tcm/log/LogState.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -40,6 +41,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataSnapshots; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.VerboseMetadataSerializer; @@ -294,6 +296,13 @@ public class LogState { logger.info("Received metadata log notification from {}, marking in progress migration complete", message.from()); ClusterMetadataService.instance().migrated(); + ClusterMetadata metadata = ClusterMetadata.currentNullable(); + if (metadata != null) + { + NodeId mynodeId = metadata.myNodeId(); + if (mynodeId != null) + SystemKeyspace.setLocalHostId(mynodeId.toUUID()); + } } log.append(message.payload); diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index bbd1225cbc..2b3dc967e4 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -712,11 +712,19 @@ public class Directory implements MetadataValue<Directory> { logger.warn("nextId differ: {} != {}", nextId, other.nextId); } + if (!Objects.equals(lastModified, other.lastModified)) + { + logger.warn("Last modified differ: {} != {}", lastModified, other.lastModified); + } if (!Objects.equals(peers, other.peers)) { logger.warn("Peers differ: {} != {}", peers, other.peers); dumpDiff(logger, peers, other.peers); } + if (!Objects.equals(locations, other.locations)) + { + logger.warn("Locations differ: {} != {}", locations, other.locations); + } if (!Objects.equals(states, other.states)) { logger.warn("States differ: {} != {}", states, other.states); @@ -727,6 +735,10 @@ public class Directory implements MetadataValue<Directory> logger.warn("Endpoints by dc differ: {} != {}", endpointsByDC, other.endpointsByDC); dumpDiff(logger, endpointsByDC.asMap(), other.endpointsByDC.asMap()); } + if (!Objects.equals(racksByDC, other.racksByDC)) + { + logger.warn("Racks by dc differ: {} != {}", racksByDC, other.racksByDC); + } if (!Objects.equals(versions, other.versions)) { logger.warn("Versions differ: {} != {}", versions, other.versions); diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 8f45699c8d..2bd2f7a392 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -35,6 +35,7 @@ import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -140,6 +141,7 @@ public class Election Startup.initializeAsFirstCMSNode(); Register.maybeRegister(); + SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID()); updateInitiator(currentCoordinator, MIGRATED); MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java index b33f4fbea1..b712a695a2 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipCMSListener.java @@ -18,17 +18,25 @@ package org.apache.cassandra.tcm.migration; +import java.util.UUID; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.compatibility.GossipHelper; +import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.utils.CassandraVersion; public class GossipCMSListener implements IEndpointStateChangeSubscriber @@ -45,26 +53,49 @@ public class GossipCMSListener implements IEndpointStateChangeSubscriber NodeId nodeId = metadata.directory.peerId(endpoint); if (nodeId == null) { - logger.error("Unknown node {} started", endpoint); - return; + VersionedValue hostIdValue = epState.getApplicationState(ApplicationState.HOST_ID); + if (hostIdValue != null) + { + UUID hostId = UUID.fromString(hostIdValue.value); + nodeId = metadata.directory.nodeIdFromHostId(hostId); + logger.info("Node {} (hostId = {}) changing IP from {} to {}", nodeId, hostId, metadata.directory.endpoint(nodeId), endpoint); + Gossiper.instance.removeEndpoint(endpoint); + } + else + { + logger.warn("Could not find NodeId for endpoint {}", endpoint); + return; + } } - // only thing that can change is the release version + // only thing that can change is the release version and addresses CassandraVersion gossipVersion = epState.getReleaseVersion(); - if (gossipVersion == null) - return; + NodeAddresses newAddresses = GossipHelper.getAddressesFromEndpointState(endpoint, epState); while (true) { NodeVersion cmVersion = metadata.directory.versions.get(nodeId); - if (cmVersion.cassandraVersion.equals(gossipVersion)) + if (cmVersion.cassandraVersion.equals(gossipVersion) && newAddresses.equals(metadata.directory.getNodeAddresses(nodeId))) { return; } else { - NodeVersion newNodeVersion = NodeVersion.fromCassandraVersion(gossipVersion); - ClusterMetadata newCM = metadata.transformer() - .withVersion(nodeId, newNodeVersion) - .buildForGossipMode(); + ClusterMetadata.Transformer transformer = metadata.transformer(); + if (gossipVersion != null && !cmVersion.cassandraVersion.equals(gossipVersion)) + transformer = transformer.withVersion(nodeId, NodeVersion.fromCassandraVersion(gossipVersion)); + + if (!newAddresses.equals(metadata.directory.getNodeAddresses(nodeId))) + { + transformer = transformer.withNewAddresses(nodeId, newAddresses); + DataPlacements newPlacement = ClusterMetadataService.instance() + .placementProvider() + .calculatePlacements(Epoch.UPGRADE_GOSSIP, + metadata.tokenMap.toRanges(), + transformer.build().metadata, + metadata.schema.getKeyspaces()); + transformer = transformer.with(newPlacement); + } + + ClusterMetadata newCM = transformer.buildForGossipMode(); if (ClusterMetadataService.instance().applyFromGossip(metadata, newCM)) return; metadata = ClusterMetadata.current(); diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java index dfc401e251..50aa3aef42 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/PreInitialize.java @@ -79,14 +79,14 @@ public class PreInitialize implements Transformation MetaStrategy.partitioner.getMinimumToken(), MetaStrategy.partitioner.getMinimumToken(), true); - dataPlacementBuilder.reads.withReplica(metadata.nextEpoch(), replica); - dataPlacementBuilder.writes.withReplica(metadata.nextEpoch(), replica); + dataPlacementBuilder.reads.withReplica(Epoch.FIRST, replica); + dataPlacementBuilder.writes.withReplica(Epoch.FIRST, replica); DataPlacements initialPlacement = metadata.placements.unbuild().with(ReplicationParams.meta(metadata), dataPlacementBuilder.build()).build(); transformer.with(initialPlacement); } ClusterMetadata.Transformer.Transformed transformed = transformer.build(); - metadata = transformed.metadata; + metadata = transformed.metadata.forceEpoch(Epoch.FIRST); assert metadata.epoch.is(Epoch.FIRST) : metadata.epoch; return new Success(metadata, LockedRanges.AffectedRanges.EMPTY, transformed.modifiedKeys); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index e7ffed2459..feca65208a 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -648,7 +648,12 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I public I bootstrap(IInstanceConfig config) { - I instance = newInstanceWrapperInternal(initialVersion, config); + return bootstrap(config, initialVersion); + } + + public I bootstrap(IInstanceConfig config, Versions.Version version) + { + I instance = newInstanceWrapperInternal(version, config); instances.add(instance); I prev = instanceMap.put(config.broadcastAddress(), instance); diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeChangeIPTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeChangeIPTest.java new file mode 100644 index 0000000000..c363ba5cf1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeChangeIPTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.impl.AbstractCluster; +import org.apache.cassandra.distributed.shared.NetworkTopology; + +public class ClusterMetadataUpgradeChangeIPTest extends UpgradeTestBase +{ + @Test + public void gossipModeIPChangeTest() throws Throwable + { + // all nodes upgraded, bouncing node3 to new ip while in gossip mode + ipChangeTestHelper(1, 2, 3); + } + + @Test + public void upgradeChangeIPTest() throws Throwable + { + // changing IP while upgrading node 3 + ipChangeTestHelper(1, 2); + } + + private void ipChangeTestHelper(int ... toUpgrade) throws Throwable + { + TokenSupplier ts = TokenSupplier.evenlyDistributedTokens(3); + new TestCase() + .nodesToUpgrade(toUpgrade) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .withBuilder(builder -> builder.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) + .withTokenSupplier((TokenSupplier) i -> i == 4 ? ts.tokens(3) : ts.tokens(i))) + .nodes(3) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> {}) + .runAfterClusterUpgrade((cluster) -> { + cluster.get(3).shutdown().get(); + IInstanceConfig nodeConfig = cluster.newInstanceConfig(); + nodeConfig.set("data_file_directories", cluster.get(3).config().get("data_file_directories")); + IUpgradeableInstance newInstance = cluster.bootstrap(nodeConfig, AbstractCluster.CURRENT_VERSION); + newInstance.startup(); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + + cluster.get(2).shutdown().get(); + cluster.get(2).startup(); + + cluster.get(2).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + }).run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org