Repository: cassandra Updated Branches: refs/heads/trunk cc0247b12 -> ff5ed7a03
Delay "node up" and "node added" notifications until native protocol server is started Patch by brandonwilliams and stefania, reviewed by brandonwilliams for CASSANDRA-8236 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ff5ed7a0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ff5ed7a0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ff5ed7a0 Branch: refs/heads/trunk Commit: ff5ed7a03f7b9968c0156b05226af67882e5670e Parents: cc0247b Author: Brandon Williams <[email protected]> Authored: Mon Mar 30 09:18:26 2015 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Mar 30 09:18:26 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/gms/ApplicationState.java | 1 + .../org/apache/cassandra/gms/EndpointState.java | 17 ++++ src/java/org/apache/cassandra/gms/Gossiper.java | 15 ++-- .../apache/cassandra/gms/VersionedValue.java | 5 ++ .../cassandra/service/CassandraDaemon.java | 2 + .../cassandra/service/StorageService.java | 83 +++++++++++++++--- .../org/apache/cassandra/transport/Server.java | 88 +++++++++++++++++--- 8 files changed, 186 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e66b724..8b95fb3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Delay "node up" and "node added" notifications until native protocol server is started (CASSANDRA-8236) * Compressed Commit Log (CASSANDRA-6809) * Optimise IntervalTree (CASSANDRA-8988) * Add a key-value payload for third party usage (CASSANDRA-8553) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/ApplicationState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index 777dfc5..ade9208 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -33,6 +33,7 @@ public enum ApplicationState NET_VERSION, HOST_ID, TOKENS, + RPC_READY, // pad to allow adding new states to existing cluster X1, X2, http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/EndpointState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java b/src/java/org/apache/cassandra/gms/EndpointState.java index 1029374..0e6985a 100644 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@ -114,6 +114,23 @@ public class EndpointState isAlive = false; } + public boolean isRpcReady() + { + VersionedValue rpcState = getApplicationState(ApplicationState.RPC_READY); + return rpcState != null && Boolean.parseBoolean(rpcState.value); + } + + public String getStatus() + { + VersionedValue status = getApplicationState(ApplicationState.STATUS); + if (status == null) + return ""; + + String[] pieces = status.value.split(VersionedValue.DELIMITER_STR, -1); + assert (pieces.length > 0); + return pieces[0]; + } + public String toString() { return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState; http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index ff1240a..07f2615 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -979,14 +979,19 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean subscriber.onJoin(ep, epState); } + public boolean isAlive(InetAddress endpoint) + { + EndpointState epState = getEndpointStateForEndpoint(endpoint); + if (epState == null) + return false; + return epState.isAlive() && !isDeadState(epState); + } + public boolean isDeadState(EndpointState epState) { - if (epState.getApplicationState(ApplicationState.STATUS) == null) + String state = epState.getStatus(); + if (state.isEmpty()) return false; - String value = epState.getApplicationState(ApplicationState.STATUS).value; - String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1); - assert (pieces.length > 0); - String state = pieces[0]; for (String deadstate : DEAD_STATES) { if (state.equals(deadstate)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/gms/VersionedValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index e8cf748..203f3a7 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -212,6 +212,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(VersionedValue.HIBERNATE + VersionedValue.DELIMITER + value); } + public VersionedValue rpcReady(boolean value) + { + return new VersionedValue(String.valueOf(value)); + } + public VersionedValue datacenter(String dcId) { return new VersionedValue(dcId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 74e356d..d6b2d24 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -425,7 +425,9 @@ public class CassandraDaemon { String nativeFlag = System.getProperty("cassandra.start_native_transport"); if ((nativeFlag != null && Boolean.parseBoolean(nativeFlag)) || (nativeFlag == null && DatabaseDescriptor.startNativeTransport())) + { nativeServer.start(); + } else logger.info("Not starting native transport as requested. Use JMX (StorageService->startNativeTransport()) or nodetool (enablebinary) to start it"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a75c08c..40686e5 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1541,6 +1541,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE case HOST_ID: SystemKeyspace.updatePeerInfo(endpoint, "host_id", UUID.fromString(value.value)); break; + case RPC_READY: + notifyRpcChange(endpoint, epState.isRpcReady()); + break; } } } @@ -1587,6 +1590,71 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return vvalue.getBytes(ISO_8859_1); } + private void notifyRpcChange(InetAddress endpoint, boolean ready) + { + if (ready) + { + notifyUp(endpoint); + notifyJoined(endpoint); + } + else + { + notifyDown(endpoint); + } + } + + private void notifyUp(InetAddress endpoint) + { + if (!isRpcReady(endpoint) || !Gossiper.instance.isAlive(endpoint)) + return; + + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onUp(endpoint); + } + + private void notifyDown(InetAddress endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onDown(endpoint); + } + + private void notifyJoined(InetAddress endpoint) + { + if (!isRpcReady(endpoint) || !isStatus(endpoint, VersionedValue.STATUS_NORMAL)) + return; + + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onJoinCluster(endpoint); + } + + private void notifyMoved(InetAddress endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onMove(endpoint); + } + + private void notifyLeft(InetAddress endpoint) + { + for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) + subscriber.onLeaveCluster(endpoint); + } + + private boolean isStatus(InetAddress endpoint, String status) + { + return Gossiper.instance.getEndpointStateForEndpoint(endpoint).getStatus().equals(status); + } + + private boolean isRpcReady(InetAddress endpoint) + { + return MessagingService.instance().getVersion(endpoint) < MessagingService.VERSION_30 || + Gossiper.instance.getEndpointStateForEndpoint(endpoint).isRpcReady(); + } + + public void setRpcReady(boolean value) + { + Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_READY, valueFactory.rpcReady(value)); + } + private Collection<Token> getTokensFor(InetAddress endpoint) { try @@ -1756,13 +1824,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (isMoving || operationMode == Mode.MOVING) { tokenMetadata.removeFromMoving(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onMove(endpoint); + notifyMoved(endpoint); } else { - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onJoinCluster(endpoint); + notifyJoined(endpoint); } PendingRangeCalculatorService.instance.update(); @@ -1902,8 +1968,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE tokenMetadata.removeEndpoint(endpoint); tokenMetadata.removeBootstrapTokens(tokens); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onLeaveCluster(endpoint); + notifyLeft(endpoint); PendingRangeCalculatorService.instance.update(); } @@ -2118,8 +2183,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (tokenMetadata.isMember(endpoint)) { HintedHandOffManager.instance.scheduleHintDelivery(endpoint, true); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onUp(endpoint); + notifyUp(endpoint); } } @@ -2132,8 +2196,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void onDead(InetAddress endpoint, EndpointState state) { MessagingService.instance().convict(endpoint); - for (IEndpointLifecycleSubscriber subscriber : lifecycleSubscribers) - subscriber.onDown(endpoint); + notifyDown(endpoint); } public void onRestart(InetAddress endpoint, EndpointState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ff5ed7a0/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 3ef7162..c7c1bdb 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -180,6 +180,8 @@ public class Server implements CassandraDaemon.Server connectionTracker.allChannels.add(bindFuture.channel()); isRunning.set(true); + + StorageService.instance.setRpcReady(true); } private void registerMetrics() @@ -204,6 +206,8 @@ public class Server implements CassandraDaemon.Server eventExecutorGroup.shutdown(); eventExecutorGroup = null; logger.info("Stop listening for CQL clients"); + + StorageService.instance.setRpcReady(false); } @@ -211,7 +215,7 @@ public class Server implements CassandraDaemon.Server { // TODO: should we be using the GlobalEventExecutor or defining our own? public final ChannelGroup allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<Event.Type, ChannelGroup>(Event.Type.class); + private final EnumMap<Event.Type, ChannelGroup> groups = new EnumMap<>(Event.Type.class); public ConnectionTracker() { @@ -333,10 +337,48 @@ public class Server implements CassandraDaemon.Server } } + private static class LatestEvent + { + public final Event.StatusChange.Status status; + public final Event.TopologyChange.Change topology; + + private LatestEvent(Event.StatusChange.Status status, Event.TopologyChange.Change topology) + { + this.status = status; + this.topology = topology; + } + + @Override + public String toString() + { + return String.format("Status %s, Topology %s", status, topology); + } + + public static LatestEvent forStatusChange(Event.StatusChange.Status status, LatestEvent prev) + { + return new LatestEvent(status, + prev == null ? + null : + prev.topology); + } + + public static LatestEvent forTopologyChange(Event.TopologyChange.Change change, LatestEvent prev) + { + return new LatestEvent(prev == null ? + null : + prev.status, + change); + } + } + private static class EventNotifier extends MigrationListener implements IEndpointLifecycleSubscriber { private final Server server; - private final Map<InetAddress, Event.StatusChange.Status> lastStatusChange = new ConcurrentHashMap<>(); + + // We keep track of the latest events we have sent to avoid sending duplicates + // since StorageService may send duplicate notifications (CASSANDRA-7816, CASSANDRA-8236) + private final Map<InetAddress, LatestEvent> latestEvents = new ConcurrentHashMap<>(); + private static final InetAddress bindAll; static { try @@ -376,31 +418,55 @@ public class Server implements CassandraDaemon.Server public void onJoinCluster(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.newNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onLeaveCluster(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.removedNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onMove(InetAddress endpoint) { - server.connectionTracker.send(Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort())); + onTopologyChange(endpoint, Event.TopologyChange.movedNode(getRpcAddress(endpoint), server.socket.getPort())); } public void onUp(InetAddress endpoint) { - Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.UP); - if (prev == null || prev != Event.StatusChange.Status.UP) - server.connectionTracker.send(Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); + onStatusChange(endpoint, Event.StatusChange.nodeUp(getRpcAddress(endpoint), server.socket.getPort())); } public void onDown(InetAddress endpoint) { - Event.StatusChange.Status prev = lastStatusChange.put(endpoint, Event.StatusChange.Status.DOWN); - if (prev == null || prev != Event.StatusChange.Status.DOWN) - server.connectionTracker.send(Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + onStatusChange(endpoint, Event.StatusChange.nodeDown(getRpcAddress(endpoint), server.socket.getPort())); + } + + private void onTopologyChange(InetAddress endpoint, Event.TopologyChange event) + { + if (logger.isTraceEnabled()) + logger.trace("Topology changed event : {}, {}", endpoint, event.change); + + LatestEvent prev = latestEvents.get(endpoint); + if (prev == null || prev.topology != event.change) + { + LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forTopologyChange(event.change, prev)); + if (ret == prev) + server.connectionTracker.send(event); + } + } + + private void onStatusChange(InetAddress endpoint, Event.StatusChange event) + { + if (logger.isTraceEnabled()) + logger.trace("Status changed event : {}, {}", endpoint, event.status); + + LatestEvent prev = latestEvents.get(endpoint); + if (prev == null || prev.status != event.status) + { + LatestEvent ret = latestEvents.put(endpoint, LatestEvent.forStatusChange(event.status, prev)); + if (ret == prev) + server.connectionTracker.send(event); + } } public void onCreateKeyspace(String ksName)
