This is an automated email from the ASF dual-hosted git repository.
brandonwilliams pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new f6d1951 Ignore stale ack in shadow round
f6d1951 is described below
commit f6d19512c4d79f800371da1e54dfe01cae5d894e
Author: Brandon Williams <[email protected]>
AuthorDate: Wed Apr 14 13:49:21 2021 -0500
Ignore stale ack in shadow round
Patch by brandonwilliams, samt, and Matt Fleming, reviewed by samt for
CASSANDRA-16588
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 5 ++
src/java/org/apache/cassandra/gms/Gossiper.java | 16 +++-
.../apache/cassandra/service/StorageService.java | 2 +-
.../org/apache/cassandra/gms/ShadowRoundTest.java | 94 ++++++++++++++++++++++
5 files changed, 116 insertions(+), 2 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d5ad726..443c2bf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.11.11
+ * Ignore stale acks received in the shadow round (CASSANDRA-16588)
* Add autocomplete and error messages for provide_overlapping_tombstones
(CASSANDRA-16350)
* Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName)
(CASSANDRA-16447)
* Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 674b597..b587635 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -83,6 +83,11 @@ public class EndpointState
return applicationState.get().get(key);
}
+ public boolean containsApplicationState(ApplicationState key)
+ {
+ return applicationState.get().containsKey(key);
+ }
+
public Set<Map.Entry<ApplicationState, VersionedValue>> states()
{
return applicationState.get().entrySet();
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 69f7fee..6bc25b6 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.gms;
-import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
@@ -1701,12 +1700,27 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
return anyNodeOn30;
}
+ public boolean sufficientForStartupSafetyCheck(Map<InetAddress,
EndpointState> epStateMap)
+ {
+ // it is possible for a previously queued ack to be sent to us when we
come back up in shadow
+ EndpointState localState =
epStateMap.get(FBUtilities.getBroadcastAddress());
+ // return false if response doesn't contain state necessary for safety
check
+ return localState == null || isDeadState(localState) ||
localState.containsApplicationState(ApplicationState.HOST_ID);
+ }
+
protected void maybeFinishShadowRound(InetAddress respondent, boolean
isInShadowRound, Map<InetAddress, EndpointState> epStateMap)
{
if (inShadowRound)
{
if (!isInShadowRound)
{
+ if (!sufficientForStartupSafetyCheck(epStateMap))
+ {
+ logger.debug("Not exiting shadow round because received
ACK with insufficient states {} -> {}",
+ FBUtilities.getBroadcastAddress(),
epStateMap.get(FBUtilities.getBroadcastAddress()));
+ return;
+ }
+
if (!seeds.contains(respondent))
logger.warn("Received an ack from {}, who isn't a seed.
Ensure your seed list includes a live node. Exiting shadow round",
respondent);
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index da3a4a8..89af682 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -616,7 +616,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
return localHostId;
}
- private synchronized void checkForEndpointCollision(UUID localHostId,
Set<InetAddress> peers) throws ConfigurationException
+ public synchronized void checkForEndpointCollision(UUID localHostId,
Set<InetAddress> peers) throws ConfigurationException
{
if (Boolean.getBoolean("cassandra.allow_unsafe_join"))
{
diff --git a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
index bc18813..a7368f4 100644
--- a/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
+++ b/test/unit/org/apache/cassandra/gms/ShadowRoundTest.java
@@ -19,17 +19,26 @@
package org.apache.cassandra.gms;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.cassandra.dht.IPartitioner;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PropertyFileSnitch;
@@ -38,6 +47,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MockMessagingService;
import org.apache.cassandra.net.MockMessagingSpy;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.net.MockMessagingService.verb;
import static org.junit.Assert.assertEquals;
@@ -113,4 +123,88 @@ public class ShadowRoundTest
assertEquals(0, spyAck2.messagesIntercepted());
assertEquals(0, spyMigrationReq.messagesIntercepted());
}
+
+ @Test
+ public void testBadAckInShadow()
+ {
+ final AtomicBoolean ackSend = new AtomicBoolean(false);
+ MockMessagingSpy spySyn =
MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+ .respondN((msgOut, to) ->
+ {
+ // ACK with bad data in shadow round
+ if (!ackSend.compareAndSet(false, true))
+ {
+ while (!Gossiper.instance.isEnabled()) ;
+ }
+ InetAddress junkaddr;
+ try
+ {
+ junkaddr = InetAddress.getByName("1.1.1.1");
+ }
+ catch (UnknownHostException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ HeartBeatState hb = new HeartBeatState(123, 456);
+ EndpointState state = new EndpointState(hb);
+ List<GossipDigest> gDigests = new
ArrayList<GossipDigest>();
+ gDigests.add(new
GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(),
hb.getHeartBeatVersion()));
+ gDigests.add(new GossipDigest(junkaddr,
hb.getGeneration(), hb.getHeartBeatVersion()));
+ Map<InetAddress, EndpointState> smap = new
HashMap<InetAddress, EndpointState>()
+ {
+ {
+ put(FBUtilities.getBroadcastAddress(), state);
+ put(junkaddr, state);
+ }
+ };
+ GossipDigestAck payload = new GossipDigestAck(gDigests,
smap);
+
+ logger.debug("Simulating bad digest ACK reply");
+ return MessageIn.create(to, payload,
Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK,
MessagingService.current_version);
+ }, 1);
+
+ System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+ try
+ {
+
StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(),
SystemKeyspace.loadHostIds().keySet());
+ }
+ catch (Exception e)
+ {
+ assertEquals("Unable to gossip with any peers", e.getMessage());
+ }
+ System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+ }
+
+ @Test
+ public void testPreviouslyAssassinatedInShadow()
+ {
+ final AtomicBoolean ackSend = new AtomicBoolean(false);
+ MockMessagingSpy spySyn =
MockMessagingService.when(verb(MessagingService.Verb.GOSSIP_DIGEST_SYN))
+ .respondN((msgOut, to) ->
+ {
+ // ACK with self assassinated in shadow round
+ if (!ackSend.compareAndSet(false, true))
+ {
+ while (!Gossiper.instance.isEnabled()) ;
+ }
+ HeartBeatState hb = new HeartBeatState(123, 456);
+ EndpointState state = new EndpointState(hb);
+ state.addApplicationState(ApplicationState.STATUS,
+ new
VersionedValue.VersionedValueFactory(DatabaseDescriptor.getPartitioner()).left(
+
Collections.singletonList(DatabaseDescriptor.getPartitioner().getRandomToken()),
1L));
+ GossipDigestAck payload = new GossipDigestAck(
+ Collections.singletonList(new
GossipDigest(FBUtilities.getBroadcastAddress(), hb.getGeneration(),
hb.getHeartBeatVersion())),
+
Collections.singletonMap(FBUtilities.getBroadcastAddress(), state));
+
+ logger.debug("Simulating bad digest ACK reply");
+ return MessageIn.create(to, payload,
Collections.emptyMap(), MessagingService.Verb.GOSSIP_DIGEST_ACK,
MessagingService.current_version);
+ }, 1);
+
+
+ System.setProperty(Config.PROPERTY_PREFIX + "auto_bootstrap", "false");
+
StorageService.instance.checkForEndpointCollision(SystemKeyspace.getLocalHostId(),
SystemKeyspace.loadHostIds().keySet());
+ System.clearProperty(Config.PROPERTY_PREFIX + "auto_bootstrap");
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]