Discard in-flight shadow round responses

patch by Stefan Podkowinski; reviewed by Joel Knighton and Jason Brown for 
CASSANDRA-12653


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bf0906b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bf0906b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bf0906b9

Branch: refs/heads/trunk
Commit: bf0906b92cf65161d828e31bc46436d427bbb4b8
Parents: 06316df
Author: Stefan Podkowinski <s.podkowin...@gmail.com>
Authored: Mon Sep 19 13:56:54 2016 +0200
Committer: Joel Knighton <j...@apache.org>
Committed: Wed Mar 22 13:08:28 2017 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../gms/GossipDigestAckVerbHandler.java         | 26 +++++---
 src/java/org/apache/cassandra/gms/Gossiper.java | 62 +++++++++++++++-----
 .../apache/cassandra/service/MigrationTask.java | 12 ++--
 .../cassandra/service/StorageService.java       | 16 +++--
 5 files changed, 79 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 27dd343..df2421d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.10
+ * Discard in-flight shadow round responses (CASSANDRA-12653)
  * Don't anti-compact repaired data to avoid inconsistencies (CASSANDRA-13153)
  * Wrong logger name in AnticompactionTask (CASSANDRA-13343)
  * Fix queries updating multiple time the same list (CASSANDRA-13130)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java 
b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
index 9f69a94..59060f8 100644
--- a/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
+++ b/src/java/org/apache/cassandra/gms/GossipDigestAckVerbHandler.java
@@ -51,21 +51,31 @@ public class GossipDigestAckVerbHandler implements 
IVerbHandler<GossipDigestAck>
         Map<InetAddress, EndpointState> epStateMap = 
gDigestAckMessage.getEndpointStateMap();
         logger.trace("Received ack with {} digests and {} states", 
gDigestList.size(), epStateMap.size());
 
-        if (epStateMap.size() > 0)
-        {
-            /* Notify the Failure Detector */
-            Gossiper.instance.notifyFailureDetector(epStateMap);
-            Gossiper.instance.applyStateLocally(epStateMap);
-        }
-
         if (Gossiper.instance.isInShadowRound())
         {
             if (logger.isDebugEnabled())
                 logger.debug("Finishing shadow round with {}", from);
-            Gossiper.instance.finishShadowRound();
+            Gossiper.instance.finishShadowRound(epStateMap);
             return; // don't bother doing anything else, we have what we came 
for
         }
 
+        if (epStateMap.size() > 0)
+        {
+            // Ignore any GossipDigestAck messages that we handle before a 
regular GossipDigestSyn has been send.
+            // This will prevent Acks from leaking over from the shadow round 
that are not actual part of
+            // the regular gossip conversation.
+            if ((System.nanoTime() - Gossiper.instance.firstSynSendAt) < 0 || 
Gossiper.instance.firstSynSendAt == 0)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("Ignoring unrequested GossipDigestAck from 
{}", from);
+                return;
+            }
+
+            /* Notify the Failure Detector */
+            Gossiper.instance.notifyFailureDetector(epStateMap);
+            Gossiper.instance.applyStateLocally(epStateMap);
+        }
+
         /* Get the state required to send to this gossipee - construct 
GossipDigestAck2Message */
         Map<InetAddress, EndpointState> deltaEpStateMap = new 
HashMap<InetAddress, EndpointState>();
         for (GossipDigest gDigest : gDigestList)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 06b14c4..c2eccba 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -30,6 +30,7 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -86,6 +87,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     private static final Logger logger = 
LoggerFactory.getLogger(Gossiper.class);
     public static final Gossiper instance = new Gossiper();
 
+    // Timestamp to prevent processing any in-flight messages for we've not 
send any SYN yet, see CASSANDRA-12653.
+    volatile long firstSynSendAt = 0L;
+
     public static final long aVeryLongTime = 259200 * 1000; // 3 days
 
     // Maximimum difference between generation value and local time we are 
willing to accept about a peer
@@ -125,6 +129,9 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     private volatile boolean inShadowRound = false;
 
+    // endpoint states as gathered during shadow round
+    private final Map<InetAddress, EndpointState> endpointShadowStateMap = new 
ConcurrentHashMap<>();
+
     private volatile long lastProcessedMessageAt = System.currentTimeMillis();
 
     private class GossipTask implements Runnable
@@ -645,6 +652,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         InetAddress to = liveEndpoints.get(index);
         if (logger.isTraceEnabled())
             logger.trace("Sending a GossipDigestSyn to {} ...", to);
+        if (firstSynSendAt == 0)
+            firstSynSendAt = System.nanoTime();
         MessagingService.instance().sendOneWay(message, to);
         return seeds.contains(to);
     }
@@ -713,11 +722,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * Check if this endpoint can safely bootstrap into the cluster.
      *
      * @param endpoint - the endpoint to check
+     * @param epStates - endpoint states in the cluster
      * @return true if the endpoint can join the cluster
      */
-    public boolean isSafeForBootstrap(InetAddress endpoint)
+    public boolean isSafeForBootstrap(InetAddress endpoint, Map<InetAddress, 
EndpointState> epStates)
     {
-        EndpointState epState = endpointStateMap.get(endpoint);
+        EndpointState epState = epStates.get(endpoint);
 
         // if there's no previous state, or the node was previously removed 
from the cluster, we're good
         if (epState == null || isDeadState(epState))
@@ -816,14 +826,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return endpointStateMap.get(ep);
     }
 
-    // removes ALL endpoint states; should only be called after shadow gossip
-    public void resetEndpointStateMap()
-    {
-        endpointStateMap.clear();
-        unreachableEndpoints.clear();
-        liveEndpoints.clear();
-    }
-
     public Set<Entry<InetAddress, EndpointState>> getEndpointStates()
     {
         return endpointStateMap.entrySet();
@@ -831,7 +833,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
     public UUID getHostId(InetAddress endpoint)
     {
-        return 
UUID.fromString(getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
+        return getHostId(endpoint, endpointStateMap);
+    }
+
+    public UUID getHostId(InetAddress endpoint, Map<InetAddress, 
EndpointState> epStates)
+    {
+        return 
UUID.fromString(epStates.get(endpoint).getApplicationState(ApplicationState.HOST_ID).value);
     }
 
     EndpointState getStateForVersionBiggerThan(InetAddress forEndpoint, int 
version)
@@ -1305,12 +1312,32 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
     }
 
     /**
-     *  Do a single 'shadow' round of gossip, where we do not modify any state
-     *  Only used when replacing a node, to get and assume its states
+     * Do a single 'shadow' round of gossip by retrieving endpoint states that 
will be stored exclusively in the
+     * map return value, instead of endpointStateMap.
+     *
+     * Used when preparing to join the ring:
+     * <ul>
+     *     <li>when replacing a node, to get and assume its tokens</li>
+     *     <li>when joining, to check that the local host id matches any 
previous id for the endpoint address</li>
+     * </ul>
+     *
+     * Method is synchronized, as we use an in-progress flag to indicate that 
shadow round must be cleared
+     * again by calling {@link Gossiper#finishShadowRound(Map)}. This will 
update
+     * {@link Gossiper#endpointShadowStateMap} with received values, in order 
to return an immutable copy to the
+     * caller of {@link Gossiper#doShadowRound()}. Therefor only a single 
shadow round execution is permitted at
+     * the same time.
+     *
+     * @return endpoint states gathered during shadow round or empty map
      */
-    public void doShadowRound()
+    public synchronized Map<InetAddress, EndpointState> doShadowRound()
     {
         buildSeedsList();
+        // it may be that the local address is the only entry in the seed
+        // list in which case, attempting a shadow round is pointless
+        if (seeds.isEmpty())
+            return endpointShadowStateMap;
+
+        endpointShadowStateMap.clear();
         // send a completely empty syn
         List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
         GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(DatabaseDescriptor.getClusterName(),
@@ -1346,6 +1373,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         {
             throw new RuntimeException(wtf);
         }
+
+        return ImmutableMap.copyOf(endpointShadowStateMap);
     }
 
     private void buildSeedsList()
@@ -1466,10 +1495,13 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return (scheduledGossipTask != null) && 
(!scheduledGossipTask.isCancelled());
     }
 
-    protected void finishShadowRound()
+    protected void finishShadowRound(Map<InetAddress, EndpointState> 
epStateMap)
     {
         if (inShadowRound)
+        {
+            endpointShadowStateMap.putAll(epStateMap);
             inShadowRound = false;
+        }
     }
 
     protected boolean isInShadowRound()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/MigrationTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java 
b/src/java/org/apache/cassandra/service/MigrationTask.java
index df0b767..b065d90 100644
--- a/src/java/org/apache/cassandra/service/MigrationTask.java
+++ b/src/java/org/apache/cassandra/service/MigrationTask.java
@@ -48,6 +48,12 @@ class MigrationTask extends WrappedRunnable
 
     public void runMayThrow() throws Exception
     {
+        if (!FailureDetector.instance.isAlive(endpoint))
+        {
+            logger.warn("Can't send schema pull request: node {} is down.", 
endpoint);
+            return;
+        }
+
         // There is a chance that quite some time could have passed between 
now and the MM#maybeScheduleSchemaPull(),
         // potentially enough for the endpoint node to restart - which is an 
issue if it does restart upgraded, with
         // a higher major.
@@ -57,12 +63,6 @@ class MigrationTask extends WrappedRunnable
             return;
         }
 
-        if (!FailureDetector.instance.isAlive(endpoint))
-        {
-            logger.debug("Can't send schema pull request: node {} is down.", 
endpoint);
-            return;
-        }
-
         MessageOut message = new 
MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, 
MigrationManager.MigrationsSerializer.instance);
 
         IAsyncCallback<Collection<Mutation>> cb = new 
IAsyncCallback<Collection<Mutation>>()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bf0906b9/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index c2996d7..65f536b 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -443,15 +443,15 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             MessagingService.instance().listen();
 
         // make magic happen
-        Gossiper.instance.doShadowRound();
+        Map<InetAddress, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
 
         // now that we've gossiped at least once, we should be able to find 
the node we're replacing
-        if 
(Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress())==
 null)
+        if (epStates.get(DatabaseDescriptor.getReplaceAddress())== null)
             throw new RuntimeException("Cannot replace_address " + 
DatabaseDescriptor.getReplaceAddress() + " because it doesn't exist in gossip");
-        replacingId = 
Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress());
+        replacingId = 
Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress(), epStates);
         try
         {
-            VersionedValue tokensVersionedValue = 
Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
+            VersionedValue tokensVersionedValue = 
epStates.get(DatabaseDescriptor.getReplaceAddress()).getApplicationState(ApplicationState.TOKENS);
             if (tokensVersionedValue == null)
                 throw new RuntimeException("Could not find tokens for " + 
DatabaseDescriptor.getReplaceAddress() + " to replace");
             Collection<Token> tokens = 
TokenSerializer.deserialize(getPartitioner(), new DataInputStream(new 
ByteArrayInputStream(tokensVersionedValue.toBytes())));
@@ -460,7 +460,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             {
                 SystemKeyspace.setLocalHostId(replacingId); // use the 
replacee's host Id as our own so we receive hints, etc
             }
-            Gossiper.instance.resetEndpointStateMap(); // clean up since we 
have what we need
             return tokens;
         }
         catch (IOException e)
@@ -474,8 +473,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         logger.debug("Starting shadow gossip round to check for endpoint 
collision");
         if (!MessagingService.instance().isListening())
             MessagingService.instance().listen();
-        Gossiper.instance.doShadowRound();
-        if 
(!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress()))
+        Map<InetAddress, EndpointState> epStates = 
Gossiper.instance.doShadowRound();
+        if 
(!Gossiper.instance.isSafeForBootstrap(FBUtilities.getBroadcastAddress(), 
epStates))
         {
             throw new RuntimeException(String.format("A node with address %s 
already exists, cancelling join. " +
                                                      "Use 
cassandra.replace_address if you want to replace this node.",
@@ -483,7 +482,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         }
         if (useStrictConsistency && !allowSimultaneousMoves())
         {
-            for (Map.Entry<InetAddress, EndpointState> entry : 
Gossiper.instance.getEndpointStates())
+            for (Map.Entry<InetAddress, EndpointState> entry : 
epStates.entrySet())
             {
                 // ignore local node or empty status
                 if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) 
|| entry.getValue().getApplicationState(ApplicationState.STATUS) == null)
@@ -495,7 +494,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                     throw new UnsupportedOperationException("Other 
bootstrapping/leaving/moving nodes detected, cannot bootstrap while 
cassandra.consistent.rangemovement is true");
             }
         }
-        Gossiper.instance.resetEndpointStateMap();
     }
 
     private boolean allowSimultaneousMoves()

Reply via email to