This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 2bab3f27ba Gossip NPE due to shutdown event corrupting empty statuses
2bab3f27ba is described below

commit 2bab3f27ba1535203d61497abe6810cdcb4640d0
Author: David Capwell <[email protected]>
AuthorDate: Tue Oct 10 09:22:05 2023 -0700

    Gossip NPE due to shutdown event corrupting empty statuses
    
    patch by David Capwell; reviewed by Brandon Williams for CASSANDRA-18913
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/EndpointState.java    |   5 +
 src/java/org/apache/cassandra/gms/Gossiper.java    |   2 +-
 .../cassandra/distributed/shared/ClusterUtils.java |  16 ++++
 .../test/gossip/GossipShutdownTest.java            | 106 +++++++++++++++++++++
 5 files changed, 129 insertions(+), 1 deletion(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 25691b1cd9..f0d59c2abf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.12
+ * Gossip NPE due to shutdown event corrupting empty statuses (CASSANDRA-18913)
  * Synchronize CQLSSTableWriter#build on the Schema.instance object 
(CASSANDRA-18317)
  * Fix closing iterator in SecondaryIndexBuilder (CASSANDRA-18361)
  * Update hdrhistogram to 2.1.12 (CASSANDRA-18893)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index b8d56263e7..782a72207c 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -191,6 +191,11 @@ public class EndpointState
         isAlive = false;
     }
 
+    public boolean isStateEmpty()
+    {
+        return applicationState.get().isEmpty();
+    }
+
     /**
      * @return true if {@link HeartBeatState#isEmpty()} is true and no STATUS 
application state exists
      */
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 2b3c48ed77..f88ee44edf 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -567,7 +567,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     {
         checkProperThreadForStateMutation();
         EndpointState epState = endpointStateMap.get(endpoint);
-        if (epState == null)
+        if (epState == null || epState.isStateEmpty())
             return;
         VersionedValue shutdown = 
StorageService.instance.valueFactory.shutdown(true);
         epState.addApplicationState(ApplicationState.STATUS_WITH_PORT, 
shutdown);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 94256cca1b..d2824ccd12 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -136,6 +136,22 @@ public class ClusterUtils
         cluster.stream().forEach(ClusterUtils::stopUnchecked);
     }
 
+    /**
+     * Create a new instance and add it to the cluster, without starting it.
+     *
+     * @param cluster to add to
+     * @param other config to copy from
+     * @param fn function to add to the config before starting
+     * @param <I> instance type
+     * @return the instance added
+     */
+    public static <I extends IInstance> I addInstance(AbstractCluster<I> 
cluster,
+                                                      IInstanceConfig other,
+                                                      
Consumer<IInstanceConfig> fn)
+    {
+        return addInstance(cluster, other.localDatacenter(), 
other.localRack(), fn);
+    }
+
     /**
      * Create a new instance and add it to the cluster, without starting it.
      *
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipShutdownTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipShutdownTest.java
new file mode 100644
index 0000000000..59ac4feeeb
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/gossip/GossipShutdownTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.gossip;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.net.Verb;
+
+public class GossipShutdownTest extends TestBaseImpl
+{
+    @Test
+    public void emptyStateAndShutdownEvent() throws IOException
+    {
+        try (Cluster cluster = builder()
+                               .withNodes(2)
+                               .withConfig(c -> c.with(Feature.values()))
+                               .start())
+        {
+            IInvokableInstance n1 = cluster.get(1);
+            IInvokableInstance n2 = cluster.get(2);
+            ClusterUtils.awaitRingJoin(n1, n2);
+            ClusterUtils.awaitRingJoin(n2, n1);
+
+            ClusterUtils.stopUnchecked(n1);
+
+            // make sure we don't see gossip events, but do allow gossip 
shutdown
+            cluster.filters().verbs(Verb.GOSSIP_DIGEST_ACK.id,
+                                    Verb.GOSSIP_DIGEST_ACK.id,
+                                    Verb.GOSSIP_DIGEST_ACK2.id)
+                   .to(1)
+                   .drop();
+
+            n1.startup();
+
+            n2.nodetoolResult("disablegossip").asserts().success();
+        }
+    }
+
+    @Test
+    public void emptyStateAndHostJoin() throws IOException, 
InterruptedException
+    {
+        try (Cluster cluster = builder()
+                               .withNodes(2)
+                               
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(3))
+                               .withConfig(c -> c.with(Feature.values()))
+                               .start())
+        {
+            IInvokableInstance n1 = cluster.get(1);
+            IInvokableInstance n2 = cluster.get(2);
+
+            ClusterUtils.awaitRingJoin(n1, n2);
+            ClusterUtils.awaitRingJoin(n2, n1);
+
+            ClusterUtils.stopUnchecked(n1);
+
+            // make sure we don't see gossip events, but do allow gossip 
shutdown
+            cluster.filters().verbs(Verb.GOSSIP_DIGEST_ACK.id,
+                                    Verb.GOSSIP_DIGEST_ACK.id,
+                                    Verb.GOSSIP_DIGEST_ACK2.id)
+                   .to(1)
+                   .drop();
+
+            n1.startup();
+
+            CountDownLatch latch = new CountDownLatch(1);
+            
cluster.filters().verbs(Verb.GOSSIP_SHUTDOWN.id).messagesMatching((from, to, 
iMessage) -> {
+                latch.countDown();
+                return false;
+            }).drop();
+
+            n2.nodetoolResult("disablegossip").asserts().success();
+
+            latch.await();
+            cluster.filters().reset();
+
+            IInvokableInstance n3 = ClusterUtils.addInstance(cluster, 
n1.config(), c -> {});
+
+            n3.startup();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to