Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 7b83334f3 -> 28865c27c


Add missing follow on fix for 7816 only applied to cassandra-2.1 branch in 
763130bdbde2f4cec2e8973bcd5203caf51cc89f


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/def4835e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/def4835e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/def4835e

Branch: refs/heads/cassandra-2.1
Commit: def4835e6b7de2b6523e2f69f53d9070e4a54a6c
Parents: b24bd08
Author: T Jake Luciani <[email protected]>
Authored: Thu Apr 9 09:14:30 2015 -0400
Committer: T Jake Luciani <[email protected]>
Committed: Thu Apr 9 09:25:06 2015 -0400

----------------------------------------------------------------------
 src/java/org/apache/cassandra/gms/EndpointState.java | 12 ------------
 src/java/org/apache/cassandra/gms/Gossiper.java      | 12 ++----------
 src/java/org/apache/cassandra/transport/Server.java  | 11 +++++++++--
 3 files changed, 11 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/gms/EndpointState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 518e575..3df9155 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -46,14 +46,12 @@ public class EndpointState
     /* fields below do not get serialized */
     private volatile long updateTimestamp;
     private volatile boolean isAlive;
-    private volatile boolean hasPendingEcho;
 
     EndpointState(HeartBeatState initialHbState)
     {
         hbState = initialHbState;
         updateTimestamp = System.nanoTime();
         isAlive = true;
-        hasPendingEcho = false;
     }
 
     HeartBeatState getHeartBeatState()
@@ -115,16 +113,6 @@ public class EndpointState
         isAlive = false;
     }
 
-    public boolean hasPendingEcho()
-    {
-        return hasPendingEcho;
-    }
-
-    public void markPendingEcho(boolean val)
-    {
-        hasPendingEcho = val;
-    }
-
     public String toString()
     {
         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = 
" + applicationState;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index 97dc506..962a358 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -877,12 +877,6 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
             return;
         }
 
-        if (localState.hasPendingEcho())
-        {
-            logger.debug("{} has already a pending echo, skipping it", 
localState);
-            return;
-        }
-
         localState.markDead();
 
         MessageOut<EchoMessage> echoMessage = new 
MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), 
EchoMessage.serializer);
@@ -896,19 +890,17 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
 
             public void response(MessageIn msg)
             {
-                localState.markPendingEcho(false);
                 realMarkAlive(addr, localState);
             }
         };
 
-        localState.markPendingEcho(true);
         MessagingService.instance().sendRR(echoMessage, addr, echoHandler);
     }
 
     private void realMarkAlive(final InetAddress addr, final EndpointState 
localState)
     {
         if (logger.isTraceEnabled())
-                logger.trace("marking as alive {}", addr);
+            logger.trace("marking as alive {}", addr);
         localState.markAlive();
         localState.updateTimestamp(); // prevents doStatusCheck from racing us 
and evicting if it was down > aVeryLongTime
         liveEndpoints.add(addr);
@@ -919,7 +911,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onAlive(addr, localState);
         if (logger.isTraceEnabled())
-                logger.trace("Notified " + subscribers);
+            logger.trace("Notified " + subscribers);
     }
 
     private void markDead(InetAddress addr, EndpointState localState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 30b8a9d..d5a5d0c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -22,7 +22,9 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.EnumMap;
+import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLContext;
@@ -318,6 +320,7 @@ public class Server implements CassandraDaemon.Server
     private static class EventNotifier implements 
IEndpointLifecycleSubscriber, IMigrationListener
     {
         private final Server server;
+        private final Map<InetAddress, Event.StatusChange.Status> 
lastStatusChange = new ConcurrentHashMap<>();
         private static final InetAddress bindAll;
         static {
             try
@@ -370,12 +373,16 @@ public class Server implements CassandraDaemon.Server
 
         public void onUp(InetAddress endpoint)
         {
-            
server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint),
 server.socket.getPort()));
+            Event.StatusChange.Status prev = lastStatusChange.put(endpoint, 
Event.StatusChange.Status.UP);
+            if (prev == null || prev != Event.StatusChange.Status.UP)
+                
server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint),
 server.socket.getPort()));
         }
 
         public void onDown(InetAddress endpoint)
         {
-            
server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint),
 server.socket.getPort()));
+            Event.StatusChange.Status prev = lastStatusChange.put(endpoint, 
Event.StatusChange.Status.DOWN);
+            if (prev == null || prev != Event.StatusChange.Status.DOWN)
+                
server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint),
 server.socket.getPort()));
         }
 
         public void onCreateKeyspace(String ksName)

Reply via email to