Updated Branches: refs/heads/cassandra-2.0 ecec863d1 -> 53af91e65 refs/heads/trunk 90e585dde -> 6635cde3a
cleanup + debug logging Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1d605281 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1d605281 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1d605281 Branch: refs/heads/cassandra-2.0 Commit: 1d6052810df9363ed8dee308444b8466be112b5d Parents: ecec863 Author: Jonathan Ellis <[email protected]> Authored: Tue Dec 17 16:34:53 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Tue Dec 17 16:37:05 2013 -0600 ---------------------------------------------------------------------- .../apache/cassandra/net/MessagingService.java | 48 +++++++++----------- 1 file changed, 22 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1d605281/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 20cad82..b2c8014 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -37,7 +37,6 @@ import com.google.common.collect.Lists; import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.concurrent.TracingAwareExecutorService; @@ -391,7 +390,7 @@ public final class MessagingService implements MessagingServiceMBean public void listen(InetAddress localEp) throws ConfigurationException { callbacks.reset(); // hack to allow tests to stop/restart MS - for (ServerSocket ss : getServerSocket(localEp)) + for (ServerSocket ss : getServerSockets(localEp)) { SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp); th.start(); @@ -400,7 +399,7 @@ public final class MessagingService implements MessagingServiceMBean listenGate.signalAll(); } - private List<ServerSocket> getServerSocket(InetAddress localEp) throws ConfigurationException + private List<ServerSocket> getServerSockets(InetAddress localEp) throws ConfigurationException { final List<ServerSocket> ss = new ArrayList<ServerSocket>(2); if (DatabaseDescriptor.getServerEncryptionOptions().internode_encryption != ServerEncryptionOptions.InternodeEncryption.none) @@ -834,36 +833,31 @@ public final class MessagingService implements MessagingServiceMBean try { socket = server.accept(); - if (authenticate(socket)) - { - socket.setKeepAlive(true); - // determine the connection type to decide whether to buffer - DataInputStream in = new DataInputStream(socket.getInputStream()); - MessagingService.validateMagic(in.readInt()); - int header = in.readInt(); - boolean isStream = MessagingService.getBits(header, 3, 1) == 1; - int version = MessagingService.getBits(header, 15, 8); - logger.debug("Connection version {} from {}", version, socket.getInetAddress()); - - if (isStream) - { - new IncomingStreamingConnection(version, socket).start(); - } - else - { - boolean compressed = MessagingService.getBits(header, 2, 1) == 1; - new IncomingTcpConnection(version, compressed, socket).start(); - } - } - else + if (!authenticate(socket)) { + logger.debug("remote failed to authenticate"); socket.close(); + continue; } + + socket.setKeepAlive(true); + // determine the connection type to decide whether to buffer + DataInputStream in = new DataInputStream(socket.getInputStream()); + MessagingService.validateMagic(in.readInt()); + int header = in.readInt(); + boolean isStream = MessagingService.getBits(header, 3, 1) == 1; + int version = MessagingService.getBits(header, 15, 8); + logger.debug("Connection version {} from {}", version, socket.getInetAddress()); + + Thread thread = isStream + ? new IncomingStreamingConnection(version, socket) + : new IncomingTcpConnection(version, MessagingService.getBits(header, 2, 1) == 1, socket); + thread.start(); } catch (AsynchronousCloseException e) { // this happens when another thread calls close(). - logger.info("MessagingService shutting down server thread"); + logger.debug("Asynchronous close seen by server thread"); break; } catch (ClosedChannelException e) @@ -877,10 +871,12 @@ public final class MessagingService implements MessagingServiceMBean FileUtils.closeQuietly(socket); } } + logger.info("MessagingService has terminated the accept() thread"); } void close() throws IOException { + logger.debug("Closing accept() thread"); server.close(); }
