This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 417bb21d2eea9081bbdafd11c1ca6769b8ca9acf
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Mar 3 08:40:21 2025 +0100

    Avoid adding LEFT nodes to tokenMap on upgrade from gossip
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20344
---
 CHANGES.txt                                        |  1 +
 .../cassandra/tcm/ClusterMetadataService.java      |  6 +-
 src/java/org/apache/cassandra/tcm/Startup.java     | 14 ++++-
 .../cassandra/tcm/compatibility/GossipHelper.java  | 19 +++---
 .../apache/cassandra/tcm/membership/Directory.java |  5 +-
 .../ClusterMetadataUpgradeAssassinateTest.java     | 67 ++++++++++++++++++++++
 6 files changed, 101 insertions(+), 11 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d0778def18..76e6fc4872 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Avoid adding LEFT nodes to tokenMap on upgrade from gossip (CASSANDRA-20344)
  * Allow empty placements when deserializing cluster metadata (CASSANDRA-20343)
  * Reduce heap pressure when initializing CMS (CASSANDRA-20267)
  * Paxos Repair: NoSuchElementException on 
DistributedSchema.getKeyspaceMetadata (CASSANDRA-20320)
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 8adfb81051..4c6eed11b4 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LocalLog;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
 import org.apache.cassandra.tcm.migration.Election;
 import org.apache.cassandra.tcm.migration.GossipProcessor;
@@ -343,6 +344,9 @@ public class ClusterMetadataService
                 continue;
             }
 
+            if (metadata.directory.peerState(entry.getKey()) == NodeState.LEFT)
+                continue;
+
             if (!version.isUpgraded())
             {
                 String msg = String.format("All nodes are not yet upgraded - 
%s is running %s", metadata.directory.endpoint(entry.getKey()), version);
@@ -356,7 +360,7 @@ public class ClusterMetadataService
             logger.info("First CMS node");
             Set<InetAddressAndPort> candidates = metadata
                                                  .directory
-                                                 .allAddresses()
+                                                 .allJoinedEndpoints()
                                                  .stream()
                                                  .filter(ep -> 
!FBUtilities.getBroadcastAddressAndPort().equals(ep) &&
                                                                
!ignored.contains(ep))
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 151d24ab90..edd8734fca 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -321,11 +321,21 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         }
         
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
         for (Map.Entry<NodeId, NodeState> entry : 
initial.directory.states.entrySet())
-            Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial);
+        {
+            InetAddressAndPort ep = 
initial.directory.addresses.get(entry.getKey()).broadcastAddress;
+            if (entry.getValue() != NodeState.LEFT)
+                Gossiper.instance.mergeNodeToGossip(entry.getKey(), initial);
+            else
+                Gossiper.runInGossipStageBlocking(() -> 
Gossiper.instance.endpointStateMap.put(ep, epStates.get(ep)));
+        }
 
         // double check that everything was added, can remove once we are 
confident
         ClusterMetadata cmGossip = 
fromEndpointStates(emptyFromSystemTables.schema, 
Gossiper.instance.getEndpointStates());
-        assert cmGossip.equals(initial) : cmGossip + " != " + initial;
+        if (!cmGossip.equals(initial))
+        {
+            cmGossip.dumpDiff(initial);
+            throw new AssertionError("Issue when populating gossip from 
cluster metadata");
+        }
     }
 
     public static void reinitializeWithClusterMetadata(String fileName, 
Function<Processor, Processor> wrapProcessor, Runnable initMessaging) throws 
IOException, StartupException
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 31a20d0bd0..0e1acbc4ea 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -107,10 +107,10 @@ public class GossipHelper
     }
 
     public static VersionedValue nodeStateToStatus(NodeId nodeId,
-                                                    ClusterMetadata metadata,
-                                                    Collection<Token> tokens,
-                                                    
VersionedValue.VersionedValueFactory valueFactory,
-                                                    VersionedValue oldValue)
+                                                   ClusterMetadata metadata,
+                                                   Collection<Token> tokens,
+                                                   
VersionedValue.VersionedValueFactory valueFactory,
+                                                   VersionedValue oldValue)
     {
         NodeState nodeState =  metadata.directory.peerState(nodeId);
         if ((tokens == null || tokens.isEmpty()) && 
!NodeState.isBootstrap(nodeState))
@@ -344,13 +344,18 @@ public class GossipHelper
             NodeAddresses nodeAddresses = 
getAddressesFromEndpointState(endpoint, epState);
             NodeVersion nodeVersion = getVersionFromEndpointState(endpoint, 
epState);
             assert hostIdString != null;
+            NodeState nodeState = toNodeState(endpoint, epState);
+
             directory = directory.withNonUpgradedNode(nodeAddresses,
                                                       new Location(dc, rack),
                                                       nodeVersion,
-                                                      toNodeState(endpoint, 
epState),
+                                                      nodeState,
                                                       
UUID.fromString(hostIdString));
-            NodeId nodeId = directory.peerId(endpoint);
-            tokenMap = tokenMap.assignTokens(nodeId, getTokensIn(partitioner, 
epState));
+            if (nodeState != NodeState.LEFT)
+            {
+                NodeId nodeId = directory.peerId(endpoint);
+                tokenMap = tokenMap.assignTokens(nodeId, 
getTokensIn(partitioner, epState));
+            }
         }
 
         ClusterMetadata forPlacementCalculation = new 
ClusterMetadata(Epoch.UPGRADE_GOSSIP,
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java 
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 87a6bde053..aab40989d0 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -156,7 +156,10 @@ public class Directory implements MetadataValue<Directory>
                                          UUID hostId)
     {
         NodeId id = new NodeId(nextId);
-        return with(addresses, id, hostId, location, 
version).withNodeState(id, state).withRackAndDC(id);
+        Directory updated = with(addresses, id, hostId, location, 
version).withNodeState(id, state);
+        if (state != NodeState.LEFT)
+            updated = updated.withRackAndDC(id);
+        return updated;
     }
 
     @VisibleForTesting
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
new file mode 100644
index 0000000000..9e5be9639a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAssassinateTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import com.google.common.collect.Streams;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+
+public class ClusterMetadataUpgradeAssassinateTest extends UpgradeTestBase
+{
+    @Test
+    public void simpleUpgradeTest() throws Throwable
+    {
+        new TestCase()
+        .nodes(3)
+        .nodesToUpgrade(1, 2)
+        .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP))
+        .upgradesToCurrentFrom(v50)
+        .setup((cluster) -> {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
+            cluster.get(3).shutdown().get();
+            cluster.get(1).nodetoolResult("assassinate", 
"127.0.0.3").asserts().success();
+        })
+        .runAfterClusterUpgrade((cluster) -> {
+            checkPlacements(cluster.get(1));
+            checkPlacements(cluster.get(2));
+            cluster.get(1).nodetoolResult("cms", 
"initialize").asserts().success();
+            checkPlacements(cluster.get(1));
+            checkPlacements(cluster.get(2));
+        }).run();
+    }
+
+    private void checkPlacements(IUpgradeableInstance i)
+    {
+        ((IInvokableInstance) i).runOnInstance(() -> {
+            ClusterMetadata metadata = ClusterMetadata.current();
+            InetAddressAndPort ep = 
InetAddressAndPort.getByNameUnchecked("127.0.0.3");
+            metadata.placements.asMap().forEach((key, value) -> {
+                if (Streams.concat(value.reads.endpoints.stream(),
+                                   value.writes.endpoints.stream())
+                           .anyMatch(fr -> fr.endpoints().contains(ep)))
+                    throw new IllegalStateException(ep + " should not be in 
placements " + metadata.placements);
+            });
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to