Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 58f25d526 -> cefaa4eb7 refs/heads/cassandra-2.1 2200f7b1b -> 9bcdd0fb7 refs/heads/trunk 33f37f86f -> c57984216
Close incoming connections when MessagingService is stopped Patch by Sergio Bossa, reviewed by brandonwilliams for CASSANDRA-9238 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cefaa4eb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cefaa4eb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cefaa4eb Branch: refs/heads/cassandra-2.0 Commit: cefaa4eb7168a63bbe5b04ad7b9bc093a34f4bb8 Parents: 58f25d5 Author: Brandon Williams <[email protected]> Authored: Wed Apr 29 16:03:36 2015 -0500 Committer: Brandon Williams <[email protected]> Committed: Wed Apr 29 16:03:36 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../gms/GossipDigestSynVerbHandler.java | 8 ---- src/java/org/apache/cassandra/gms/Gossiper.java | 26 ------------- .../net/IncomingStreamingConnection.java | 31 ++++++++++++---- .../cassandra/net/IncomingTcpConnection.java | 39 +++++++++++++------- .../apache/cassandra/net/MessagingService.java | 11 +++++- 6 files changed, 59 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 213f226..3df91ce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,6 @@ 2.0.15: * IncomingTcpConnection thread is not named (CASSANDRA-9262) - * Ignore gossip SYNs after shutdown (CASSANDRA-9238) + * Close incoming connections when MessagingService is stopped (CASSANDRA-9238) * Avoid overflow when calculating max sstable size in LCS (CASSANDRA-9235) * Make sstable blacklisting work with compression (CASSANDRA-9138) * Do not attempt to rebuild indexes if no index accepts any column (CASSANDRA-9196) http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java index 4454c46..df74808 100644 --- a/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java +++ b/src/java/org/apache/cassandra/gms/GossipDigestSynVerbHandler.java @@ -36,22 +36,14 @@ public class GossipDigestSynVerbHandler implements IVerbHandler<GossipDigestSyn> public void doVerb(MessageIn<GossipDigestSyn> message, int id) { InetAddress from = message.from; - if (logger.isTraceEnabled()) logger.trace("Received a GossipDigestSynMessage from {}", from); - if (!Gossiper.instance.isEnabled()) { if (logger.isTraceEnabled()) logger.trace("Ignoring GossipDigestSynMessage because gossip is disabled"); return; } - - if (!Gossiper.instance.shouldAckAfterShutdown(from)) - { - logger.debug("Temporarily ignoring SYN from shutdown node {}", from); - return; - } GossipDigestSyn gDigestMessage = message.payload; /* If the message is from a different cluster throw it away. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 4665e74..b77064d 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -103,9 +103,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /* unreachable member set */ private final Map<InetAddress, Long> unreachableEndpoints = new ConcurrentHashMap<InetAddress, Long>(); - - /* shutdown member set */ - private final Map<InetAddress, Long> shutdownEndpoints = new ConcurrentHashMap<InetAddress, Long>(); /* initial seeds for joining the cluster */ private final Set<InetAddress> seeds = new ConcurrentSkipListSet<InetAddress>(inetcomparator); @@ -305,24 +302,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return 0L; } - public boolean shouldAckAfterShutdown(InetAddress endpoint) - { - Long shutdownTimestamp = shutdownEndpoints.get(endpoint); - Integer shutdownAnnounce = Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000); - // Do not temporarily answer to SYN messages coming from shutdown nodes, to avoid reconnecting: - if (shutdownTimestamp != null && (System.currentTimeMillis() - shutdownTimestamp) < (shutdownAnnounce * 2)) - { - return false; - } - // Otherwise, if allowed to answer, remove the node from the shutdown set - // (and do this only here, rather than in sendGossip too, to avoid racing between different calling threads): - else - { - shutdownEndpoints.remove(endpoint); - return true; - } - } - private boolean isShutdown(InetAddress endpoint) { EndpointState epState = endpointStateMap.get(endpoint); @@ -371,7 +350,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean return; epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.shutdown(true)); epState.getHeartBeatState().forceHighestPossibleVersionUnsafe(); - shutdownEndpoints.put(endpoint, System.currentTimeMillis()); markDead(endpoint, epState); FailureDetector.instance.forceConviction(endpoint); } @@ -644,10 +622,6 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean /* Generate a random number from 0 -> size */ int index = (size == 1) ? 0 : random.nextInt(size); InetAddress to = liveEndpoints.get(index); - Long shutdownTimestamp = shutdownEndpoints.get(to); - Integer shutdownAnnounce = Integer.getInteger("cassandra.shutdown_announce_in_ms", 2000); - if (shutdownTimestamp != null && (System.currentTimeMillis() - shutdownTimestamp) < (shutdownAnnounce * 2)) - return false; if (logger.isTraceEnabled()) logger.trace("Sending a GossipDigestSyn to {} ...", to); MessagingService.instance().sendOneWay(message, to); http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java index 20392f2..e37299f 100644 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java @@ -17,10 +17,12 @@ */ package org.apache.cassandra.net; +import java.io.Closeable; import java.io.DataInput; import java.io.DataInputStream; import java.io.IOException; import java.net.Socket; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,18 +34,20 @@ import org.apache.cassandra.streaming.messages.StreamMessage; /** * Thread to consume stream init messages. */ -public class IncomingStreamingConnection extends Thread +public class IncomingStreamingConnection extends Thread implements Closeable { private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class); private final int version; private final Socket socket; + private final Set<Closeable> group; - public IncomingStreamingConnection(int version, Socket socket) + public IncomingStreamingConnection(int version, Socket socket, Set<Closeable> group) { super("STREAM-INIT-" + socket.getRemoteSocketAddress()); this.version = version; this.socket = socket; + this.group = group; } @Override @@ -67,14 +71,27 @@ public class IncomingStreamingConnection extends Thread catch (IOException e) { logger.debug("IOException reading from socket; closing", e); - try + close(); + } + } + + @Override + public void close() + { + try + { + if (!socket.isClosed()) { socket.close(); } - catch (IOException e2) - { - logger.debug("error closing socket", e2); - } + } + catch (IOException e) + { + logger.debug("Error closing socket", e); + } + finally + { + group.remove(this); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index fbdd221..b61e82e 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -21,6 +21,7 @@ import java.io.*; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,21 +31,23 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.UnknownColumnFamilyException; import org.apache.cassandra.gms.Gossiper; -public class IncomingTcpConnection extends Thread +public class IncomingTcpConnection extends Thread implements Closeable { private static final Logger logger = LoggerFactory.getLogger(IncomingTcpConnection.class); private final int version; private final boolean compressed; private final Socket socket; + private final Set<Closeable> group; public InetAddress from; - public IncomingTcpConnection(int version, boolean compressed, Socket socket) + public IncomingTcpConnection(int version, boolean compressed, Socket socket, Set<Closeable> group) { super("MessagingService-Incoming-" + socket.getInetAddress()); this.version = version; this.compressed = compressed; this.socket = socket; + this.group = group; if (DatabaseDescriptor.getInternodeRecvBufferSize() != null) { try @@ -91,6 +94,26 @@ public class IncomingTcpConnection extends Thread close(); } } + + @Override + public void close() + { + try + { + if (!socket.isClosed()) + { + socket.close(); + } + } + catch (IOException e) + { + logger.debug("Error closing socket", e); + } + finally + { + group.remove(this); + } + } private void receiveMessages() throws IOException { @@ -162,16 +185,4 @@ public class IncomingTcpConnection extends Thread } return message.from; } - - private void close() - { - try - { - socket.close(); - } - catch (IOException e) - { - logger.debug("Error closing socket", e); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/cefaa4eb/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 117bd3c..d570faf 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -33,6 +33,7 @@ import javax.management.ObjectName; import com.google.common.base.Function; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; @@ -885,6 +886,7 @@ public final class MessagingService implements MessagingServiceMBean private static class SocketThread extends Thread { private final ServerSocket server; + private final Set<Closeable> connections = Sets.newConcurrentHashSet(); SocketThread(ServerSocket server, String name) { @@ -919,9 +921,10 @@ public final class MessagingService implements MessagingServiceMBean socket.setSoTimeout(0); Thread thread = isStream - ? new IncomingStreamingConnection(version, socket) - : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket); + ? new IncomingStreamingConnection(version, socket, connections) + : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket, connections); thread.start(); + connections.add((Closeable) thread); } catch (AsynchronousCloseException e) { @@ -947,6 +950,10 @@ public final class MessagingService implements MessagingServiceMBean { logger.debug("Closing accept() thread"); server.close(); + for (Closeable connection : connections) + { + connection.close(); + } } private boolean authenticate(Socket socket)
