Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 7b83334f3 -> 28865c27c
Add missing follow on fix for 7816 only applied to cassandra-2.1 branch in 763130bdbde2f4cec2e8973bcd5203caf51cc89f Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/def4835e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/def4835e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/def4835e Branch: refs/heads/cassandra-2.1 Commit: def4835e6b7de2b6523e2f69f53d9070e4a54a6c Parents: b24bd08 Author: T Jake Luciani <[email protected]> Authored: Thu Apr 9 09:14:30 2015 -0400 Committer: T Jake Luciani <[email protected]> Committed: Thu Apr 9 09:25:06 2015 -0400 ---------------------------------------------------------------------- src/java/org/apache/cassandra/gms/EndpointState.java | 12 ------------ src/java/org/apache/cassandra/gms/Gossiper.java | 12 ++---------- src/java/org/apache/cassandra/transport/Server.java | 11 +++++++++-- 3 files changed, 11 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 518e575..3df9155 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -46,14 +46,12 @@ public class EndpointState /* fields below do not get serialized */ private volatile long updateTimestamp; private volatile boolean isAlive; - private volatile boolean hasPendingEcho; EndpointState(HeartBeatState initialHbState) { hbState = initialHbState; updateTimestamp = System.nanoTime(); isAlive = true; - hasPendingEcho = false; } HeartBeatState getHeartBeatState() @@ -115,16 +113,6 @@ public class EndpointState isAlive = false; } - public boolean hasPendingEcho() - { - return hasPendingEcho; - } - - public void markPendingEcho(boolean val) - { - hasPendingEcho = val; - } - public String toString() { return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 97dc506..962a358 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -877,12 +877,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return; } - if (localState.hasPendingEcho()) - { - logger.debug("{} has already a pending echo, skipping it", localState); - return; - } - localState.markDead(); MessageOut<EchoMessage> echoMessage = new MessageOut<EchoMessage>(MessagingService.Verb.ECHO, new EchoMessage(), EchoMessage.serializer); @@ -896,19 +890,17 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean public void response(MessageIn msg) { - localState.markPendingEcho(false); realMarkAlive(addr, localState); } }; - localState.markPendingEcho(true); MessagingService.instance().sendRR(echoMessage, addr, echoHandler); } private void realMarkAlive(final InetAddress addr, final EndpointState localState) { if (logger.isTraceEnabled()) - logger.trace("marking as alive {}", addr); + logger.trace("marking as alive {}", addr); localState.markAlive(); localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime liveEndpoints.add(addr); @@ -919,7 +911,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean for (IEndpointStateChangeSubscriber subscriber : subscribers) subscriber.onAlive(addr, localState); if (logger.isTraceEnabled()) - logger.trace("Notified " + subscribers); + logger.trace("Notified " + subscribers); } private void markDead(InetAddress addr, EndpointState localState) http://git-wip-us.apache.org/repos/asf/cassandra/blob/def4835e/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 30b8a9d..d5a5d0c 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -22,7 +22,9 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.EnumMap; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.SSLContext; @@ -318,6 +320,7 @@ public class Server implements CassandraDaemon.Server private static class EventNotifier implements IEndpointLifecycleSubscriber, IMigrationListener { private final Server server; + private final Map<InetAddress, Event.StatusChange.Status> lastStatusChange = new ConcurrentHashMap<>(); private static final InetAddress bindAll; static { try @@ -370,12 +373,16 @@ public class Server implements CassandraDaemon.Server public void onUp(InetAddress endpoint) { - server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); + Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP); + if (prev == null || prev != Event.StatusChange.Status.UP) + server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } public void onDown(InetAddress endpoint) { - server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN); + if (prev == null || prev != Event.StatusChange.Status.DOWN) + server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); } public void onCreateKeyspace(String ksName)
