Fix duplicate up/down messages sent to native clients

Patch by Stefania, reviewed by brandonwilliams for CASSANDRA-7816


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2199a87a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2199a87a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2199a87a

Branch: refs/heads/cassandra-2.1
Commit: 2199a87aab8322c41f1b590c0fd8f08f448952ca
Parents: 77c66bf
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Fri Mar 13 08:02:12 2015 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Fri Mar 13 08:02:12 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/gms/EndpointState.java | 12 ++++++++++
 src/java/org/apache/cassandra/gms/Gossiper.java | 25 +++++++++++++++-----
 3 files changed, 32 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 382b3dd..8843908 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.14:
+ * Fix duplicate up/down messages sent to native clients (CASSANDRA-7816)
  * Expose commit log archive status via JMX (CASSANDRA-8734)
  * Provide better exceptions for invalid replication strategy parameters
    (CASSANDRA-8909)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 3df9155..518e575 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -46,12 +46,14 @@ public class EndpointState
     /* fields below do not get serialized */
     private volatile long updateTimestamp;
     private volatile boolean isAlive;
+    private volatile boolean hasPendingEcho;
 
     EndpointState(HeartBeatState initialHbState)
     {
         hbState = initialHbState;
         updateTimestamp = System.nanoTime();
         isAlive = true;
+        hasPendingEcho = false;
     }
 
     HeartBeatState getHeartBeatState()
@@ -113,6 +115,16 @@ public class EndpointState
         isAlive = false;
     }
 
+    public boolean hasPendingEcho()
+    {
+        return hasPendingEcho;
+    }
+
+    public void markPendingEcho(boolean val)
+    {
+        hasPendingEcho = val;
+    }
+
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = 
" + applicationState;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2199a87a/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index a478405..97dc506 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -29,6 +29,7 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
 
 import org.apache.cassandra.utils.Pair;
@@ -48,8 +49,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 
-import com.google.common.collect.ImmutableList;
-
 /**
  * This module is responsible for Gossiping information for the local 
endpoint. This abstraction
  * maintains the list of live and dead endpoints. Periodically i.e. every 1 
second this module
@@ -878,6 +877,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             return;
         }
 
+        if (localState.hasPendingEcho())
+        {
+            logger.debug("{} has already a pending echo, skipping it", 
localState);
+            return;
+        }
+
         localState.markDead();
 
         MessageOut<EchoMessage> echoMessage = new 
MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), 
EchoMessage.serializer);
@@ -891,9 +896,12 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
             public void response(MessageIn msg)
             {
+                localState.markPendingEcho(false);
                 realMarkAlive(addr, localState);
             }
         };
+
+        localState.markPendingEcho(true);
         MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
     }
 
@@ -936,9 +944,10 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      */
     private void handleMajorStateChange(InetAddress ep, EndpointState epState)
     {
+        EndpointState localEpState = endpointStateMap.get(ep);
         if (!isDeadState(epState))
         {
-            if (endpointStateMap.get(ep) != null)
+            if (localEpState != null)
                 logger.info("Node {} has restarted, now UP", ep);
             else
                 logger.info("Node {} is now part of the cluster", ep);
@@ -947,9 +956,11 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             logger.trace("Adding endpoint state for " + ep);
         endpointStateMap.put(ep, epState);
 
-        // the node restarted: it is up to the subscriber to take whatever 
action is necessary
-        for (IEndpointStateChangeSubscriber subscriber : subscribers)
-            subscriber.onRestart(ep, epState);
+        if (localEpState != null)
+        {   // the node restarted: it is up to the subscriber to take whatever 
action is necessary
+            for (IEndpointStateChangeSubscriber subscriber : subscribers)
+                subscriber.onRestart(ep, localEpState);
+        }
 
         if (!isDeadState(epState))
             markAlive(ep, epState);
@@ -994,6 +1005,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
             EndpointState localEpStatePtr = endpointStateMap.get(ep);
             EndpointState remoteState = entry.getValue();
+
             /*
                 If state does not exist just add it. If it does then add it if 
the remote generation is greater.
                 If there is a generation tie, attempt to break it by heartbeat 
version.
@@ -1024,6 +1036,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
                     }
                     else if (logger.isTraceEnabled())
                             logger.trace("Ignoring remote version " + 
remoteMaxVersion + " <= " + localMaxVersion + " for " + ep);
+
                     if (!localEpStatePtr.isAlive() && 
!isDeadState(localEpStatePtr)) // unless of course, it was dead
                         markAlive(ep, localEpStatePtr);
                 }

Reply via email to