Don't reset nodes' versions when closing IncomingTcpConnections patch by Aleksey Yeschenko; reviewed by Marcus Eriksson for CASSANDRA-7734
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e70f4f81 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e70f4f81 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e70f4f81 Branch: refs/heads/trunk Commit: e70f4f81a223f6bad9823673b770581cbfcda34c Parents: f5a4321 Author: Aleksey Yeschenko <[email protected]> Authored: Tue Oct 7 18:37:09 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Oct 7 18:37:09 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ .../cassandra/net/IncomingTcpConnection.java | 28 +++++++------------- .../apache/cassandra/net/MessagingService.java | 2 +- 3 files changed, 13 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e70f4f81/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a249d83..188f951 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.0.11: + * Don't reset nodes' versions when closing IncomingTcpConnections + (CASSANDRA-7734) * Record the real messaging version in all cases in OutboundTcpConnection (CASSANDRA-8057) * SSL does not work in cassandra-cli (CASSANDRA-7899) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e70f4f81/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index f1e2193..3296cfd 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -69,9 +69,9 @@ public class IncomingTcpConnection extends Thread try { if (version < MessagingService.VERSION_12) - handleLegacyVersion(); - else - handleModernVersion(); + throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0"); + + receiveMessages(); } catch (EOFException e) { @@ -92,15 +92,19 @@ public class IncomingTcpConnection extends Thread } } - private void handleModernVersion() throws IOException + private void receiveMessages() throws IOException { + // handshake (true) endpoint versions DataOutputStream out = new DataOutputStream(socket.getOutputStream()); out.writeInt(MessagingService.current_version); out.flush(); - DataInputStream in = new DataInputStream(socket.getInputStream()); int maxVersion = in.readInt(); + from = CompactEndpointSerializationHelper.deserialize(in); + // record the (true) version of the endpoint + MessagingService.instance().setVersion(from, maxVersion); + logger.debug("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from)); if (compressed) { @@ -112,7 +116,6 @@ public class IncomingTcpConnection extends Thread in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); } - logger.debug("Max version for {} is {}", from, maxVersion); if (version > MessagingService.current_version) { // save the endpoint so gossip will reconnect to it @@ -120,8 +123,6 @@ public class IncomingTcpConnection extends Thread logger.info("Received messages from newer protocol version {}. Ignoring", version); return; } - MessagingService.instance().setVersion(from, maxVersion); - logger.debug("Set version for {} to {} (will use {})", from, maxVersion, Math.min(MessagingService.current_version, maxVersion)); // outbound side will reconnect if necessary to upgrade version while (true) @@ -131,11 +132,6 @@ public class IncomingTcpConnection extends Thread } } - private void handleLegacyVersion() - { - throw new UnsupportedOperationException("Unable to read obsolete message version " + version + "; the earliest version supported is 1.2.0"); - } - private InetAddress receiveMessage(DataInputStream input, int version) throws IOException { int id; @@ -169,17 +165,13 @@ public class IncomingTcpConnection extends Thread private void close() { - // reset version here, since we set when starting an incoming socket - if (from != null) - MessagingService.instance().resetVersion(from); try { socket.close(); } catch (IOException e) { - if (logger.isDebugEnabled()) - logger.debug("error closing socket", e); + logger.debug("Error closing socket", e); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e70f4f81/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 0bb1b17..d2e65d8 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -791,7 +791,7 @@ public final class MessagingService implements MessagingServiceMBean public void resetVersion(InetAddress endpoint) { - logger.debug("Reseting version for {}", endpoint); + logger.debug("Resetting version for {}", endpoint); Integer removed = versions.remove(endpoint); if (removed != null && removed <= VERSION_20) refreshAllNodesAtLeast20();
