update MessagingService protocol to allow version handshake with broadcast address identification patch by jbellis; reviewed by brandonwilliams and Marcus Eriksson for CASSANDRA-4311
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c82a9d9b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c82a9d9b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c82a9d9b Branch: refs/heads/trunk Commit: c82a9d9b96667cc2afeb2609fbff955b4232fc2b Parents: 15577ba Author: Jonathan Ellis <[email protected]> Authored: Wed Jun 6 12:33:10 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Thu Jun 14 18:03:46 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/commitlog/CommitLog.java | 1 - src/java/org/apache/cassandra/gms/Gossiper.java | 4 +- .../cassandra/net/IncomingTcpConnection.java | 157 +++++++++----- .../org/apache/cassandra/net/MessagingService.java | 22 +-- .../cassandra/net/OutboundTcpConnection.java | 122 +++++++----- .../cassandra/net/OutboundTcpConnectionPool.java | 19 ++- .../apache/cassandra/net/io/SerializerType.java | 26 --- 8 files changed, 197 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b7d4861..2eb1b57 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 1.2-dev + * update MS protocol with a version handshake + broadcast address id + (CASSANDRA-4311) * multithreaded hint replay (CASSANDRA-4189) * add inter-node message compression (CASSANDRA-3127) * enforce 1m min keycache for auto (CASSANDRA-4306) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index 080cd22..6649ae4 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -24,7 +24,6 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.net.MessagingService; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/gms/Gossiper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index ecc3453..06dafe7 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -200,10 +200,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean subscribers.remove(subscriber); } - public void setVersion(InetAddress address, int version) + public Integer setVersion(InetAddress address, int version) { logger.debug("Setting version {} for {}", version, address); - versions.put(address, version); + return versions.put(address, version); } public void resetVersion(InetAddress endpoint) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java index 620fbdb..e808c7e 100644 --- a/src/java/org/apache/cassandra/net/IncomingTcpConnection.java +++ b/src/java/org/apache/cassandra/net/IncomingTcpConnection.java @@ -17,15 +17,13 @@ */ package org.apache.cassandra.net; -import java.io.BufferedInputStream; -import java.io.DataInputStream; -import java.io.EOFException; -import java.io.IOException; +import java.io.*; import java.net.InetAddress; import java.net.Socket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.xerial.snappy.SnappyInputStream; import org.apache.cassandra.gms.Gossiper; @@ -57,61 +55,19 @@ public class IncomingTcpConnection extends Thread try { // determine the connection type to decide whether to buffer - DataInputStream input = new DataInputStream(socket.getInputStream()); - MessagingService.validateMagic(input.readInt()); - int header = input.readInt(); - boolean compressed = MessagingService.getBits(header, 2, 1) == 1; + DataInputStream in = new DataInputStream(socket.getInputStream()); + MessagingService.validateMagic(in.readInt()); + int header = in.readInt(); boolean isStream = MessagingService.getBits(header, 3, 1) == 1; int version = MessagingService.getBits(header, 15, 8); + logger.debug("Connection version {} from {}", version, socket.getInetAddress()); + if (isStream) - { - if (version == MessagingService.current_version) - { - int size = input.readInt(); - byte[] headerBytes = new byte[size]; - input.readFully(headerBytes); - stream(StreamHeader.serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input); - } - else - { - // streaming connections are per-session and have a fixed version. we can't do anything with a wrong-version stream connection, so drop it. - logger.error("Received stream using protocol version {} (my version {}). Terminating connection", - version, MessagingService.current_version); - } - // We are done with this connection.... - return; - } - - // we should buffer - input = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); - // Receive the first message to set the version. - from = receiveMessage(input, version); // why? see => CASSANDRA-4099 - logger.debug("Version for {} is {}", from, version); - if (version > MessagingService.current_version) - { - // save the endpoint so gossip will reconnect to it - Gossiper.instance.addSavedEndpoint(from); - logger.info("Received " + (isStream ? "streaming " : "") + "connection from newer protocol version. Ignoring"); - return; - } - Gossiper.instance.setVersion(from, version); - logger.debug("set version for {} to {}", from, version); - if (compressed) - { - logger.debug("Upgrading incoming connection to be compressed"); - input = new DataInputStream(new SnappyInputStream(input)); - } - // loop to get the next message. - while (true) - { - // prepare to read the next message - MessagingService.validateMagic(input.readInt()); - header = input.readInt(); - assert isStream == (MessagingService.getBits(header, 3, 1) == 1) : "Connections cannot change type: " + isStream; - version = MessagingService.getBits(header, 15, 8); - logger.trace("Version is now {}", version); - receiveMessage(input, version); - } + handleStream(in, version); + else if (version < MessagingService.VERSION_12) + handleLegacyVersion(version); + else + handleModernVersion(version, header); } catch (EOFException e) { @@ -128,6 +84,95 @@ public class IncomingTcpConnection extends Thread } } + private void handleModernVersion(int version, int header) throws IOException + { + DataOutputStream out = new DataOutputStream(socket.getOutputStream()); + out.writeInt(MessagingService.current_version); + out.flush(); + + DataInputStream in = new DataInputStream(socket.getInputStream()); + int maxVersion = in.readInt(); + from = CompactEndpointSerializationHelper.deserialize(in); + boolean compressed = MessagingService.getBits(header, 2, 1) == 1; + + if (compressed) + { + logger.debug("Upgrading incoming connection to be compressed"); + in = new DataInputStream(new SnappyInputStream(socket.getInputStream())); + } + else + { + in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); + } + + logger.debug("Max version for {} is {}", from, maxVersion); + if (version > MessagingService.current_version) + { + // save the endpoint so gossip will reconnect to it + Gossiper.instance.addSavedEndpoint(from); + logger.info("Received messages from newer protocol version {}. Ignoring", version); + return; + } + Gossiper.instance.setVersion(from, Math.min(MessagingService.current_version, maxVersion)); + logger.debug("set version for {} to {}", from, Math.min(MessagingService.current_version, maxVersion)); + // outbound side will reconnect if necessary to upgrade version + + while (true) + { + MessagingService.validateMagic(in.readInt()); + receiveMessage(in, version); + } + } + + private void handleLegacyVersion(int version) throws IOException + { + DataInputStream in = new DataInputStream(new BufferedInputStream(socket.getInputStream(), 4096)); + + from = receiveMessage(in, version); // why? see => CASSANDRA-4099 + logger.debug("Version for {} is {}", from, version); + if (version > MessagingService.current_version) + { + // save the endpoint so gossip will reconnect to it + Gossiper.instance.addSavedEndpoint(from); + logger.info("Received messages from newer protocol version. Ignoring"); + return; + } + int lastVersion = Gossiper.instance.setVersion(from, version); + logger.debug("set version for {} to {}", from, version); + if (lastVersion < version) + { + logger.debug("breaking outbound connections to force version upgrade"); + MessagingService.instance().getConnectionPool(from).resetToNewerVersion(version); + } + + while (true) + { + MessagingService.validateMagic(in.readInt()); + int header = in.readInt(); // legacy protocol re-sends header for each message + assert !(MessagingService.getBits(header, 3, 1) == 1) : "Non-stream connection cannot change to stream"; + version = MessagingService.getBits(header, 15, 8); + logger.trace("Version is now {}", version); + receiveMessage(in, version); + } + } + + private void handleStream(DataInputStream input, int version) throws IOException + { + if (version == MessagingService.current_version) + { + int size = input.readInt(); + byte[] headerBytes = new byte[size]; + input.readFully(headerBytes); + stream(StreamHeader.serializer.deserialize(new DataInputStream(new FastByteArrayInputStream(headerBytes)), version), input); + } + else + { + // streaming connections are per-session and have a fixed version. we can't do anything with a wrong-version stream connection, so drop it. + logger.error("Received stream using protocol version {} (my version {}). Terminating connection", + version, MessagingService.current_version); + } + } + private InetAddress receiveMessage(DataInputStream input, int version) throws IOException { if (version <= MessagingService.VERSION_11) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 6dd954d..ed9e5b3 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -54,7 +54,6 @@ import org.apache.cassandra.gms.GossipDigestSyn; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.ILatencySubscriber; -import org.apache.cassandra.net.io.SerializerType; import org.apache.cassandra.net.sink.SinkManager; import org.apache.cassandra.security.SSLFactory; import org.apache.cassandra.service.AntiEntropyService; @@ -78,8 +77,6 @@ public final class MessagingService implements MessagingServiceMBean public static final int VERSION_12 = 5; public static final int current_version = VERSION_12; - static SerializerType serializerType = SerializerType.BINARY; - /** we preface every message with this number so the recipient can validate the sender is sane */ static final int PROTOCOL_MAGIC = 0xCA552DFA; @@ -721,34 +718,21 @@ public final class MessagingService implements MessagingServiceMBean throw new IOException("invalid protocol header"); } - public static int getBits(int x, int p, int n) + public static int getBits(int packed, int start, int count) { - return x >>> (p + 1) - n & ~(-1 << n); + return packed >>> (start + 1) - count & ~(-1 << count); } public ByteBuffer constructStreamHeader(StreamHeader streamHeader, boolean compress, int version) { - /* - Setting up the protocol header. This is 4 bytes long - represented as an integer. The first 2 bits indicate - the serializer type. The 3rd bit indicates if compression - is turned on or off. It is turned off by default. The 4th - bit indicates if we are in streaming mode. It is turned off - by default. The following 4 bits are reserved for future use. - The next 8 bits indicate a version number. Remaining 15 bits - are not used currently. - */ int header = 0; - // Setting up the serializer bit - header |= serializerType.ordinal(); // set compression bit. - if ( compress ) + if (compress) header |= 4; // set streaming bit header |= 8; // Setting up the version bit header |= (version << 8); - /* Finished the protocol header setup */ /* Adding the StreamHeader which contains the session Id along * with the pendingfile info for the stream. http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java index 76f496c..a123072 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java @@ -18,6 +18,7 @@ package org.apache.cassandra.net; import java.io.BufferedOutputStream; +import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.InetAddress; @@ -28,6 +29,9 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; import org.xerial.snappy.SnappyOutputStream; import org.apache.cassandra.config.Config; @@ -53,21 +57,19 @@ public class OutboundTcpConnection extends Thread private Socket socket; private volatile long completed; private final AtomicLong dropped = new AtomicLong(); - private boolean isUpgraded = false; - private boolean writeToLocalDC; + private int targetVersion; public OutboundTcpConnection(OutboundTcpConnectionPool pool) { super("WRITE-" + pool.endPoint()); - this.writeToLocalDC = isSameDC(pool.endPoint()); this.poolReference = pool; } - private static boolean isSameDC(InetAddress targetHost) + private static boolean isLocalDC(InetAddress targetHost) { String remoteDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(targetHost); - String thisDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress()); - return remoteDC.equals(thisDC); + String localDC = DatabaseDescriptor.getEndpointSnitch().getDatacenter(DatabaseDescriptor.getRpcAddress()); + return remoteDC.equals(localDC); } public void enqueue(MessageOut<?> message, String id) @@ -95,6 +97,11 @@ public class OutboundTcpConnection extends Thread enqueue(CLOSE_SENTINEL, null); } + public int getTargetVersion() + { + return targetVersion; + } + public void run() { while (true) @@ -149,13 +156,11 @@ public class OutboundTcpConnection extends Thread return dropped.get(); } - private boolean shouldUpgradeConnection() + private boolean shouldCompressConnection() { - if(Gossiper.instance.getVersion(poolReference.endPoint()) >= MessagingService.current_version && - (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all || - (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !writeToLocalDC))) - return true; - return false; + // assumes version >= 1.2 + return DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.all + || (DatabaseDescriptor.internodeCompression() == Config.InternodeCompression.dc && !isLocalDC(poolReference.endPoint())); } private void writeConnected(MessageOut<?> message, String id) @@ -168,13 +173,6 @@ public class OutboundTcpConnection extends Thread { out.flush(); } - if(!isUpgraded && shouldUpgradeConnection()) - { - out.flush(); - logger.debug("Upgrading OutputStream to be compressed"); - out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream()))); - isUpgraded = true; - } } catch (Exception e) { @@ -189,41 +187,37 @@ public class OutboundTcpConnection extends Thread public void write(MessageOut<?> message, String id, DataOutputStream out) throws IOException { - write(message, id, out, Gossiper.instance.getVersion(poolReference.endPoint()), shouldUpgradeConnection()); + write(message, id, out, targetVersion); } - public static void write(MessageOut message, String id, DataOutputStream out, int version, boolean compressionEnabled) throws IOException + public static void write(MessageOut message, String id, DataOutputStream out, int version) throws IOException { - /* - Setting up the protocol header. This is 4 bytes long - represented as an integer. The first 2 bits indicate - the serializer type. The 3rd bit indicates if compression - is turned on or off. It is turned off by default. The 4th - bit indicates if we are in streaming mode. It is turned off - by default. The 5th-8th bits are reserved for future use. - The next 8 bits indicate a version number. Remaining 15 bits - are not used currently. - */ - int header = 0; - // Setting up the serializer bit - header |= MessagingService.serializerType.ordinal(); - // set compression bit. - if (compressionEnabled) - header |= 4; - // Setting up the version bit - header |= (version << 8); - out.writeInt(MessagingService.PROTOCOL_MAGIC); - out.writeInt(header); - + if (version < MessagingService.VERSION_12) + writeHeader(out, version, false); // 0.8 included a total message size int. 1.0 doesn't need it but expects it to be there. - if (version <= MessagingService.VERSION_11) + if (version < MessagingService.VERSION_12) out.writeInt(-1); out.writeUTF(id); message.serialize(out, version); } + private static void writeHeader(DataOutputStream out, int version, boolean compressionEnabled) throws IOException + { + // 2 bits: unused. used to be "serializer type," which was always Binary + // 1 bit: compression + // 1 bit: streaming mode + // 3 bits: unused + // 8 bits: version + // 15 bits: unused + int header = 0; + if (compressionEnabled) + header |= 4; + header |= (version << 8); + out.writeInt(header); + } + private void disconnect() { if (socket != null) @@ -244,9 +238,11 @@ public class OutboundTcpConnection extends Thread private boolean connect() { - isUpgraded = false; if (logger.isDebugEnabled()) logger.debug("attempting to connect to " + poolReference.endPoint()); + + targetVersion = Gossiper.instance.getVersion(poolReference.endPoint()); + long start = System.currentTimeMillis(); while (System.currentTimeMillis() < start + DatabaseDescriptor.getRpcTimeout()) { @@ -256,6 +252,41 @@ public class OutboundTcpConnection extends Thread socket.setKeepAlive(true); socket.setTcpNoDelay(true); out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream(), 4096)); + + if (targetVersion >= MessagingService.VERSION_12) + { + out.writeInt(MessagingService.PROTOCOL_MAGIC); + writeHeader(out, targetVersion, shouldCompressConnection()); + out.flush(); + + DataInputStream in = new DataInputStream(socket.getInputStream()); + int maxTargetVersion = in.readInt(); + if (targetVersion > maxTargetVersion) + { + logger.debug("Target max version is {}; will reconnect with that version", maxTargetVersion); + Gossiper.instance.setVersion(poolReference.endPoint(), maxTargetVersion); + disconnect(); + return false; + } + + if (targetVersion < maxTargetVersion && targetVersion < MessagingService.current_version) + { + logger.debug("Detected higher max version {} (using {}); will reconnect when queued messages are done", + maxTargetVersion, targetVersion); + Gossiper.instance.setVersion(poolReference.endPoint(), Math.min(MessagingService.current_version, maxTargetVersion)); + softCloseSocket(); + } + + out.writeInt(MessagingService.current_version); + CompactEndpointSerializationHelper.serialize(FBUtilities.getBroadcastAddress(), out); + if (shouldCompressConnection()) + { + out.flush(); + logger.debug("Upgrading OutputStream to be compressed"); + out = new DataOutputStream(new SnappyOutputStream(new BufferedOutputStream(socket.getOutputStream()))); + } + } + return true; } catch (IOException e) @@ -273,9 +304,6 @@ public class OutboundTcpConnection extends Thread } } } - // nodes can change dc during the lifetime of this OutboundTcpConnection, but we cannot upgrade a - // connection mid-flight, so it is safe to set here and dont change until the next connect() - this.writeToLocalDC = isSameDC(poolReference.endPoint()); return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java index 45b7627..ce51cf0 100644 --- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java +++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java @@ -61,10 +61,19 @@ public class OutboundTcpConnectionPool : cmdCon; } - synchronized void reset() + void reset() { - for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon }) - con.closeSocket(); + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + conn.closeSocket(); + } + + public void resetToNewerVersion(int version) + { + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + { + if (version > conn.getTargetVersion()) + conn.softCloseSocket(); + } } /** @@ -75,8 +84,8 @@ public class OutboundTcpConnectionPool public void reset(InetAddress remoteEP) { resetedEndpoint = remoteEP; - for (OutboundTcpConnection con : new OutboundTcpConnection[] { cmdCon, ackCon }) - con.softCloseSocket(); + for (OutboundTcpConnection conn : new OutboundTcpConnection[] { cmdCon, ackCon }) + conn.softCloseSocket(); } public Socket newSocket() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/c82a9d9b/src/java/org/apache/cassandra/net/io/SerializerType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/io/SerializerType.java b/src/java/org/apache/cassandra/net/io/SerializerType.java deleted file mode 100644 index ac5526f..0000000 --- a/src/java/org/apache/cassandra/net/io/SerializerType.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net.io; - -public enum SerializerType -{ - BINARY, - JAVA, - XML, - JSON -}
