This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 417bb21d2eea9081bbdafd11c1ca6769b8ca9acf Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Mar 3 08:40:21 2025 +0100 Avoid adding LEFT nodes to tokenMap on upgrade from gossip Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20344 --- CHANGES.txt | 1 + .../cassandra/tcm/ClusterMetadataService.java | 6 +- src/java/org/apache/cassandra/tcm/Startup.java | 14 ++++- .../cassandra/tcm/compatibility/GossipHelper.java | 19 +++--- .../apache/cassandra/tcm/membership/Directory.java | 5 +- .../ClusterMetadataUpgradeAssassinateTest.java | 67 ++++++++++++++++++++++ 6 files changed, 101 insertions(+), 11 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index d0778def18..76e6fc4872 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Avoid adding LEFT nodes to tokenMap on upgrade from gossip (CASSANDRA-20344) * Allow empty placements when deserializing cluster metadata (CASSANDRA-20343) * Reduce heap pressure when initializing CMS (CASSANDRA-20267) * Paxos Repair: NoSuchElementException on DistributedSchema.getKeyspaceMetadata (CASSANDRA-20320) diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 8adfb81051..4c6eed11b4 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -55,6 +55,7 @@ import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LocalLog; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.migration.GossipProcessor; @@ -343,6 +344,9 @@ public class ClusterMetadataService continue; } + if (metadata.directory.peerState(entry.getKey()) == NodeState.LEFT) + continue; + if (!version.isUpgraded()) { String msg = String.format("All nodes are not yet upgraded - %s is running %s", metadata.directory.endpoint(entry.getKey()), version); @@ -356,7 +360,7 @@ public class ClusterMetadataService logger.info("First CMS node"); Set<InetAddressAndPort> candidates = metadata .directory - .allAddresses() + .allJoinedEndpoints() .stream() .filter(ep -> !FBUtilities.getBroadcastAddressAndPort().equals(ep) && !ignored.contains(ep)) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index 151d24ab90..edd8734fca 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -321,11 +321,21 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; } Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration()); for (Map.Entry<NodeId, NodeState> entry : initial.directory.states.entrySet()) - Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial); + { + InetAddressAndPort ep = initial.directory.addresses.get(entry.getKey()).broadcastAddress; + if (entry.getValue() != NodeState.LEFT) + Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial); + else + Gossiper.runInGossipStageBlocking(() -> Gossiper.instance.endpointStateMap.put(ep, epStates.get(ep))); + } // double check that everything was added, can remove once we are confident ClusterMetadata cmGossip = fromEndpointStates(emptyFromSystemTables.schema, Gossiper.instance.getEndpointStates()); - assert cmGossip.equals(initial) : cmGossip + " != " + initial; + if (!cmGossip.equals(initial)) + { + cmGossip.dumpDiff(initial); + throw new AssertionError("Issue when populating gossip from cluster metadata"); + } } public static void reinitializeWithClusterMetadata(String fileName, Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws IOException, StartupException diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 31a20d0bd0..0e1acbc4ea 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -107,10 +107,10 @@ public class GossipHelper } public static VersionedValue nodeStateToStatus(NodeId nodeId, - ClusterMetadata metadata, - Collection<Token> tokens, - VersionedValue.VersionedValueFactory valueFactory, - VersionedValue oldValue) + ClusterMetadata metadata, + Collection<Token> tokens, + VersionedValue.VersionedValueFactory valueFactory, + VersionedValue oldValue) { NodeState nodeState = metadata.directory.peerState(nodeId); if ((tokens == null || tokens.isEmpty()) && !NodeState.isBootstrap(nodeState)) @@ -344,13 +344,18 @@ public class GossipHelper NodeAddresses nodeAddresses = getAddressesFromEndpointState(endpoint, epState); NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, epState); assert hostIdString != null; + NodeState nodeState = toNodeState(endpoint, epState); + directory = directory.withNonUpgradedNode(nodeAddresses, new Location(dc, rack), nodeVersion, - toNodeState(endpoint, epState), + nodeState, UUID.fromString(hostIdString)); - NodeId nodeId = directory.peerId(endpoint); - tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, epState)); + if (nodeState != NodeState.LEFT) + { + NodeId nodeId = directory.peerId(endpoint); + tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, epState)); + } } ClusterMetadata forPlacementCalculation = new ClusterMetadata(Epoch.UPGRADE_GOSSIP, diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index 87a6bde053..aab40989d0 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -156,7 +156,10 @@ public class Directory implements MetadataValue<Directory> UUID hostId) { NodeId id = new NodeId(nextId); - return with(addresses, id, hostId, location, version).withNodeState(id, state).withRackAndDC(id); + Directory updated = with(addresses, id, hostId, location, version).withNodeState(id, state); + if (state != NodeState.LEFT) + updated = updated.withRackAndDC(id); + return updated; } @VisibleForTesting diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java new file mode 100644 index 0000000000..9e5be9639a --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import com.google.common.collect.Streams; +import org.junit.Test; + +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; + +public class ClusterMetadataUpgradeAssassinateTest extends UpgradeTestBase +{ + @Test + public void simpleUpgradeTest() throws Throwable + { + new TestCase() + .nodes(3) + .nodesToUpgrade(1, 2) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + cluster.get(3).shutdown().get(); + cluster.get(1).nodetoolResult("assassinate", "127.0.0.3").asserts().success(); + }) + .runAfterClusterUpgrade((cluster) -> { + checkPlacements(cluster.get(1)); + checkPlacements(cluster.get(2)); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + checkPlacements(cluster.get(1)); + checkPlacements(cluster.get(2)); + }).run(); + } + + private void checkPlacements(IUpgradeableInstance i) + { + ((IInvokableInstance) i).runOnInstance(() -> { + ClusterMetadata metadata = ClusterMetadata.current(); + InetAddressAndPort ep = InetAddressAndPort.getByNameUnchecked("127.0.0.3"); + metadata.placements.asMap().forEach((key, value) -> { + if (Streams.concat(value.reads.endpoints.stream(), + value.writes.endpoints.stream()) + .anyMatch(fr -> fr.endpoints().contains(ep))) + throw new IllegalStateException(ep + " should not be in placements " + metadata.placements); + }); + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
