Updated Branches: refs/heads/trunk d5fc1932e -> f1711794c
Simplify and generalize startup message in binary protocol patch by slebresne; reviewed by thepaul for CASSANDRA-4539 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f1711794 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f1711794 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f1711794 Branch: refs/heads/trunk Commit: f1711794cf911c349a3b60e0104064adb8124148 Parents: d5fc193 Author: Sylvain Lebresne <[email protected]> Authored: Fri Sep 7 10:55:13 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Sep 7 10:55:13 2012 +0200 ---------------------------------------------------------------------- doc/native_protocol.spec | 30 +++--- .../org/apache/cassandra/transport/CBUtil.java | 48 +++++++++ .../org/apache/cassandra/transport/Client.java | 7 +- .../apache/cassandra/transport/SimpleClient.java | 7 +- .../transport/messages/OptionsMessage.java | 20 +++- .../transport/messages/StartupMessage.java | 80 +++------------ .../transport/messages/SupportedMessage.java | 23 ++--- 7 files changed, 109 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index d19963b..1de7a95 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -161,6 +161,11 @@ Table of Contents will be described when this is used. [option list] A [short] n, followed by n [option]. + [string map] A [short] n, followed by n pair <k><v> where <k> and <v> + are [string]. + [string multimap] A [short] n, followed by n pair <k><v> where <k> is a + [string] and <v> is a [string list]. + 4. Messages @@ -176,19 +181,16 @@ Table of Contents (in which case credentials will need to be provided using CREDENTIALS). This must be the first message of the connection, except for OPTIONS that can - be sent before to find out the option supported by the server. Once the + be sent before to find out the options supported by the server. Once the connection has been initialized, a client should not send any more STARTUP message. - The body is defined as: - <version><options> - where: - - <version> is a [string] representing the version of the CQL version to use. - Currently the only version supported is 3.0.0. Note that this is different - from the protocol version. - - <options> is an [option list]. Valid option ids are: - 0x0001 Compression: the value is a [string] representing the - algorithm to use (See section 5). + The body is a [string map] of options. Possible options are: + - "CQL_VERSION": the version of CQL to use. This option is mandatory and + currenty, the only version supported is "3.0.0". Note that this is + different from the protocol version. + - "COMPRESSION": the compression algorithm to use for frames (See section 5). + This is optional, if not specified no compression will be used. 4.1.2. CREDENTIALS @@ -284,10 +286,8 @@ Table of Contents Indicates which startup options are supported by the server. This message comes as a response to an OPTIONS message. - The body of a SUPPORTED message is a [string list] indicating which CQL - version the server support, followed by a second [string list] indicating - which compression algorithm is supported, if any (at the time of this writing, - only snappy compression is available if the library is in the classpath). + The body of a SUPPORTED message is a [string multimap]. This multimap gives + for each of the supported STARTUP options, the list of supported values. 4.2.5. RESULT @@ -397,7 +397,7 @@ Table of Contents use, which is done in the STARTUP message. As a consequence, a STARTUP message must never be compressed. However, once the STARTUP frame has been received by the server can be compressed (including the response to the STARTUP - request). Frame do not have to compressed however, even if compression has + request). Frame do not have to be compressed however, even if compression has been agreed upon (a server may only compress frame above a certain size at its discretion). A frame body should be compressed if and only if the compressed flag (see Section 2.2) is set. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/CBUtil.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/CBUtil.java b/src/java/org/apache/cassandra/transport/CBUtil.java index 791f097..44ae64d 100644 --- a/src/java/org/apache/cassandra/transport/CBUtil.java +++ b/src/java/org/apache/cassandra/transport/CBUtil.java @@ -20,7 +20,9 @@ package org.apache.cassandra.transport; import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -128,6 +130,52 @@ public abstract class CBUtil cb.writeBytes(stringToCB(str)); } + public static Map<String, String> readStringMap(ChannelBuffer cb) + { + int length = cb.readUnsignedShort(); + Map<String, String> m = new HashMap<String, String>(length); + for (int i = 0; i < length; i++) + { + String k = readString(cb).toUpperCase(); + String v = readString(cb); + m.put(k, v); + } + return m; + } + + public static void writeStringMap(ChannelBuffer cb, Map<String, String> m) + { + cb.writeShort(m.size()); + for (Map.Entry<String, String> entry : m.entrySet()) + { + cb.writeBytes(stringToCB(entry.getKey())); + cb.writeBytes(stringToCB(entry.getValue())); + } + } + + public static Map<String, List<String>> readStringToStringListMap(ChannelBuffer cb) + { + int length = cb.readUnsignedShort(); + Map<String, List<String>> m = new HashMap<String, List<String>>(length); + for (int i = 0; i < length; i++) + { + String k = readString(cb).toUpperCase(); + List<String> v = readStringList(cb); + m.put(k, v); + } + return m; + } + + public static void writeStringToStringListMap(ChannelBuffer cb, Map<String, List<String>> m) + { + cb.writeShort(m.size()); + for (Map.Entry<String, List<String>> entry : m.entrySet()) + { + cb.writeBytes(stringToCB(entry.getKey())); + writeStringList(cb, entry.getValue()); + } + } + public static ByteBuffer readValue(ChannelBuffer cb) { int length = cb.readInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/Client.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Client.java b/src/java/org/apache/cassandra/transport/Client.java index c65522e..2cf815f 100644 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@ -82,17 +82,18 @@ public class Client extends SimpleClient String msgType = iter.next().toUpperCase(); if (msgType.equals("STARTUP")) { - EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class); + Map<String, String> options = new HashMap<String, String>(); + options.put(StartupMessage.CQL_VERSION, "3.0.0"); while (iter.hasNext()) { String next = iter.next(); if (next.toLowerCase().equals("snappy")) { - options.put(StartupMessage.Option.COMPRESSION, "snappy"); + options.put(StartupMessage.COMPRESSION, "snappy"); connection.setCompressor(FrameCompressor.SnappyCompressor.instance); } } - return new StartupMessage("3.0.0", options); + return new StartupMessage(options); } else if (msgType.equals("QUERY")) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index 9e0ea2e..66cf5ae 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -74,13 +74,14 @@ public class SimpleClient { establishConnection(); - EnumMap<StartupMessage.Option, Object> options = new EnumMap<StartupMessage.Option, Object>(StartupMessage.Option.class); + Map<String, String> options = new HashMap<String, String>(); + options.put(StartupMessage.CQL_VERSION, "3.0.0"); if (useCompression) { - options.put(StartupMessage.Option.COMPRESSION, "snappy"); + options.put(StartupMessage.COMPRESSION, "snappy"); connection.setCompressor(FrameCompressor.SnappyCompressor.instance); } - execute(new StartupMessage("3.0.0", options)); + execute(new StartupMessage(options)); } protected void establishConnection() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java index 1a028de..cecead2 100644 --- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java @@ -17,6 +17,11 @@ */ package org.apache.cassandra.transport.messages; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -54,11 +59,18 @@ public class OptionsMessage extends Message.Request public Message.Response execute() { - SupportedMessage supported = new SupportedMessage(); - supported.cqlVersions.add(QueryProcessor.CQL_VERSION.toString()); + List<String> cqlVersions = new ArrayList<String>(); + cqlVersions.add(QueryProcessor.CQL_VERSION.toString()); + + List<String> compressions = new ArrayList<String>(); if (FrameCompressor.SnappyCompressor.instance != null) - supported.compressions.add("snappy"); - return supported; + compressions.add("snappy"); + + Map<String, List<String>> supported = new HashMap<String, List<String>>(); + supported.put(StartupMessage.CQL_VERSION, cqlVersions); + supported.put(StartupMessage.COMPRESSION, compressions); + + return new SupportedMessage(supported); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index fc28b69..23199f1 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -34,83 +34,29 @@ import org.apache.cassandra.utils.SemanticVersion; */ public class StartupMessage extends Message.Request { - public enum Option implements OptionCodec.Codecable<Option> - { - COMPRESSION(1); - - private final int id; - - private Option(int id) - { - this.id = id; - } - - public int getId() - { - return id; - } - - public Object readValue(ChannelBuffer cb) - { - switch (this) - { - case COMPRESSION: - return CBUtil.readString(cb); - default: - throw new AssertionError(); - } - } - - public void writeValue(Object value, ChannelBuffer cb) - { - switch (this) - { - case COMPRESSION: - assert value instanceof String; - cb.writeBytes(CBUtil.stringToCB((String)value)); - break; - } - } - - public int serializedValueSize(Object value) - { - switch (this) - { - case COMPRESSION: - return 2 + ((String)value).getBytes(Charsets.UTF_8).length; - default: - throw new AssertionError(); - } - } - } - - private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class); + public static final String CQL_VERSION = "CQL_VERSION"; + public static final String COMPRESSION = "COMPRESSION"; public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>() { public StartupMessage decode(ChannelBuffer body) { - String verString = CBUtil.readString(body); - - Map<Option, Object> options = optionCodec.decode(body); - return new StartupMessage(verString, options); + return new StartupMessage(CBUtil.readStringMap(body)); } public ChannelBuffer encode(StartupMessage msg) { - ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion); - ChannelBuffer ocb = optionCodec.encode(msg.options); - return ChannelBuffers.wrappedBuffer(vcb, ocb); + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); + CBUtil.writeStringMap(cb, msg.options); + return cb; } }; - public final String cqlVersion; - public final Map<Option, Object> options; + public final Map<String, String> options; - public StartupMessage(String cqlVersion, Map<Option, Object> options) + public StartupMessage(Map<String, String> options) { super(Message.Type.STARTUP); - this.cqlVersion = cqlVersion; this.options = options; } @@ -123,13 +69,17 @@ public class StartupMessage extends Message.Request { try { + String cqlVersion = options.get(CQL_VERSION); + if (cqlVersion == null) + throw new ProtocolException("Missing value CQL_VERSION in STARTUP message"); + connection.clientState().setCQLVersion(cqlVersion); if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion)); - if (options.containsKey(Option.COMPRESSION)) + if (options.containsKey(COMPRESSION)) { - String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase(); + String compression = options.get(COMPRESSION).toLowerCase(); if (compression.equals("snappy")) { if (FrameCompressor.SnappyCompressor.instance == null) @@ -156,6 +106,6 @@ public class StartupMessage extends Message.Request @Override public String toString() { - return "STARTUP cqlVersion=" + cqlVersion; + return "STARTUP " + options; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f1711794/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java index 0b1a6b5..fe0fa77 100644 --- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java @@ -19,6 +19,7 @@ package org.apache.cassandra.transport.messages; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; @@ -35,33 +36,23 @@ public class SupportedMessage extends Message.Response { public SupportedMessage decode(ChannelBuffer body) { - List<String> versions = CBUtil.readStringList(body); - List<String> compressions = CBUtil.readStringList(body); - return new SupportedMessage(versions, compressions); + return new SupportedMessage(CBUtil.readStringToStringListMap(body)); } public ChannelBuffer encode(SupportedMessage msg) { ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); - CBUtil.writeStringList(cb, msg.cqlVersions); - CBUtil.writeStringList(cb, msg.compressions); + CBUtil.writeStringToStringListMap(cb, msg.supported); return cb; } }; - public final List<String> cqlVersions; - public final List<String> compressions; + public final Map<String, List<String>> supported; - public SupportedMessage() - { - this(new ArrayList<String>(), new ArrayList<String>()); - } - - private SupportedMessage(List<String> cqlVersions, List<String> compressions) + public SupportedMessage(Map<String, List<String>> supported) { super(Message.Type.SUPPORTED); - this.cqlVersions = cqlVersions; - this.compressions = compressions; + this.supported = supported; } public ChannelBuffer encode() @@ -72,6 +63,6 @@ public class SupportedMessage extends Message.Response @Override public String toString() { - return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions; + return "SUPPORTED " + supported; } }
