Updated Branches: refs/heads/trunk 5ac567628 -> 5a18e3706
Native protocol event don't respect the protocol version patch by slebresne; reviewed by iamaleksey for CASSANDRA-5778 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a18e370 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a18e370 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a18e370 Branch: refs/heads/trunk Commit: 5a18e3706424270439aaacbf8ef9b57547510459 Parents: 5ac5676 Author: Sylvain Lebresne <[email protected]> Authored: Fri Jul 19 17:21:40 2013 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Fri Jul 19 17:21:40 2013 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + doc/native_protocol_v2.spec | 11 +++-- .../apache/cassandra/transport/Connection.java | 19 ++++--- .../org/apache/cassandra/transport/Frame.java | 52 +++++++++++--------- .../org/apache/cassandra/transport/Message.java | 34 +++++-------- .../org/apache/cassandra/transport/Server.java | 11 ++++- .../cassandra/transport/ServerConnection.java | 14 ++---- .../cassandra/transport/SimpleClient.java | 8 +-- .../transport/messages/AuthChallenge.java | 5 +- .../transport/messages/AuthResponse.java | 5 +- .../transport/messages/AuthSuccess.java | 5 +- .../transport/messages/AuthenticateMessage.java | 4 +- .../transport/messages/BatchMessage.java | 4 +- .../transport/messages/CredentialsMessage.java | 4 +- .../transport/messages/ErrorMessage.java | 4 +- .../transport/messages/EventMessage.java | 4 +- .../transport/messages/ExecuteMessage.java | 4 +- .../transport/messages/OptionsMessage.java | 4 +- .../transport/messages/PrepareMessage.java | 4 +- .../transport/messages/QueryMessage.java | 4 +- .../transport/messages/ReadyMessage.java | 4 +- .../transport/messages/RegisterMessage.java | 4 +- .../transport/messages/ResultMessage.java | 28 +++++------ .../transport/messages/StartupMessage.java | 4 +- .../transport/messages/SupportedMessage.java | 4 +- 25 files changed, 126 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6b2e2ba..6fa719f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -12,6 +12,7 @@ * Fix bugs in the native protocol v2 (CASSANDRA-5770) * CAS on 'primary key only' table (CASSANDRA-5715) * Support streaming SSTables of old versions (CASSANDRA-5772) + * Always respect protocol version in native protocol (CASSANDRA-5778) 2.0.0-beta1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/doc/native_protocol_v2.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec index 76d697f..9b6d52c 100644 --- a/doc/native_protocol_v2.spec +++ b/doc/native_protocol_v2.spec @@ -69,10 +69,8 @@ Table of Contents The protocol distinguishes 2 types of frames: requests and responses. Requests are those frame sent by the clients to the server, response are the ones sent - by the server. Note however that while communication are initiated by the - client with the server responding to request, the protocol may likely add - server pushes in the future, so responses does not obligatory come right after - a client request. + by the server. Note however that the protocol supports server pushes (events) + so responses does not necessarily come right after a client request. Note to client implementors: clients library should always assume that the body of a given frame may contain more data than what is described in this @@ -98,6 +96,11 @@ Table of Contents 0x02 Request frame for this protocol version 0x82 Response frame for this protocol version + Please note that the while every message ship with the version, only one version + of messages is accepted on a given connection. In other words, the first message + exchanged (STARTUP) sets the version for the connection for the lifetime of this + connection. + This document describe the version 2 of the protocol. For the changes made since version 1, see Section 9. http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Connection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Connection.java b/src/java/org/apache/cassandra/transport/Connection.java index 67f06a4..a72402f 100644 --- a/src/java/org/apache/cassandra/transport/Connection.java +++ b/src/java/org/apache/cassandra/transport/Connection.java @@ -21,13 +21,19 @@ import org.jboss.netty.channel.Channel; public class Connection { - private volatile FrameCompressor frameCompressor; - private volatile Channel channel; + private final Channel channel; + private final int version; private final Tracker tracker; - public Connection(Tracker tracker) + private volatile FrameCompressor frameCompressor; + + public Connection(Channel channel, int version, Tracker tracker) { + this.channel = channel; + this.version = version; this.tracker = tracker; + + tracker.addConnection(channel, this); } public void setCompressor(FrameCompressor compressor) @@ -45,10 +51,9 @@ public class Connection return tracker; } - public void registerChannel(Channel ch) + public int getVersion() { - channel = ch; - tracker.addConnection(ch, this); + return version; } public Channel channel() @@ -58,7 +63,7 @@ public class Connection public interface Factory { - public Connection newConnection(Tracker tracker); + public Connection newConnection(Channel channel, int version); } public interface Tracker http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 014d512..b19ea33 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -34,7 +34,6 @@ public class Frame { public final Header header; public final ChannelBuffer body; - public final Connection connection; /** * On-wire frame. @@ -47,14 +46,13 @@ public class Frame * | length | * +---------+---------+---------+---------+ */ - private Frame(Header header, ChannelBuffer body, Connection connection) + private Frame(Header header, ChannelBuffer body) { this.header = header; this.body = body; - this.connection = connection; } - public static Frame create(ChannelBuffer fullFrame, Connection connection) + public static Frame create(ChannelBuffer fullFrame) { assert fullFrame.readableBytes() >= Header.LENGTH : String.format("Frame too short (%d bytes = %s)", fullFrame.readableBytes(), @@ -72,13 +70,13 @@ public class Frame version = version & 0x7F; Header header = new Header(version, flags, streamId, Message.Type.fromOpcode(opcode, direction)); - return new Frame(header, fullFrame, connection); + return new Frame(header, fullFrame); } - public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection) + public static Frame create(Message.Type type, int streamId, int version, EnumSet<Header.Flag> flags, ChannelBuffer body) { Header header = new Header(version, flags, streamId, type); - return new Frame(header, body, connection); + return new Frame(header, body); } public static class Header @@ -134,25 +132,19 @@ public class Frame public Frame with(ChannelBuffer newBody) { - return new Frame(header, newBody, connection); + return new Frame(header, newBody); } public static class Decoder extends LengthFieldBasedFrameDecoder { private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB - private final Connection connection; - public Decoder(Connection.Tracker tracker, Connection.Factory factory) - { - super(MAX_FRAME_LENTH, 4, 4, 0, 0, true); - this.connection = factory.newConnection(tracker); - } + private final Connection.Factory factory; - @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception + public Decoder(Connection.Factory factory) { - connection.registerChannel(e.getChannel()); + super(MAX_FRAME_LENTH, 4, 4, 0, 0, true); + this.factory = factory; } @Override @@ -183,7 +175,19 @@ public class Frame { return null; } - return Frame.create(frame, connection); + + Connection connection = (Connection)channel.getAttachment(); + if (connection == null) + { + // First message seen on this channel, attach the connection object + connection = factory.newConnection(channel, version); + channel.setAttachment(connection); + } + else if (connection.getVersion() != version) + { + throw new ProtocolException(String.format("Invalid message version. Got %d but previous messages on this connection had version %d", version, connection.getVersion())); + } + return Frame.create(frame); } catch (CorruptedFrameException e) { @@ -225,11 +229,12 @@ public class Frame assert msg instanceof Frame : "Expecting frame, got " + msg; Frame frame = (Frame)msg; + Connection connection = (Connection)channel.getAttachment(); - if (!frame.header.flags.contains(Header.Flag.COMPRESSED)) + if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || connection == null) return frame; - FrameCompressor compressor = frame.connection.getCompressor(); + FrameCompressor compressor = connection.getCompressor(); if (compressor == null) return frame; @@ -245,12 +250,13 @@ public class Frame assert msg instanceof Frame : "Expecting frame, got " + msg; Frame frame = (Frame)msg; + Connection connection = (Connection)channel.getAttachment(); // Never compress STARTUP messages - if (frame.header.type == Message.Type.STARTUP || frame.connection == null) + if (frame.header.type == Message.Type.STARTUP || connection == null) return frame; - FrameCompressor compressor = frame.connection.getCompressor(); + FrameCompressor compressor = connection.getCompressor(); if (compressor == null) return frame; http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 8c46f9b..557bddf 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -119,7 +119,6 @@ public abstract class Message public final Type type; protected volatile Connection connection; private volatile int streamId; - private volatile int version = Frame.Header.CURRENT_VERSION; protected Message(Type type) { @@ -147,18 +146,7 @@ public abstract class Message return streamId; } - public int getVersion() - { - return version; - } - - public Message setVersion(int version) - { - this.version = version; - return this; - } - - public abstract ChannelBuffer encode(); + public abstract ChannelBuffer encode(int version); public static abstract class Request extends Message { @@ -225,13 +213,12 @@ public abstract class Message { Message message = frame.header.type.codec.decode(frame.body, frame.header.version); message.setStreamId(frame.header.streamId); - message.setVersion(frame.header.version); if (isRequest) { assert message instanceof Request; Request req = (Request)message; - req.attach(frame.connection); + req.attach((Connection)channel.getAttachment()); if (isTracing) req.setTracingRequested(); } @@ -260,7 +247,11 @@ public abstract class Message Message message = (Message)msg; - ChannelBuffer body = message.encode(); + Connection connection = (Connection)channel.getAttachment(); + // The only case the connection can be null is when we send the initial STARTUP message (client side thus) + int version = connection == null ? Frame.Header.CURRENT_VERSION : connection.getVersion(); + + ChannelBuffer body = message.encode(version); EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class); if (message instanceof Response) { @@ -278,7 +269,7 @@ public abstract class Message flags.add(Frame.Header.Flag.TRACING); } - return Frame.create(message.type, message.getStreamId(), message.getVersion(), flags, body, message.connection()); + return Frame.create(message.type, message.getStreamId(), version, flags, body); } } @@ -298,24 +289,23 @@ public abstract class Message { assert request.connection() instanceof ServerConnection; ServerConnection connection = (ServerConnection)request.connection(); - QueryState qstate = connection.validateNewMessage(request.type, request.getVersion(), request.getStreamId()); + QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId()); - logger.debug("Received: {}, v={}", request, request.getVersion()); + logger.debug("Received: {}, v={}", request, connection.getVersion()); Response response = request.execute(qstate); response.setStreamId(request.getStreamId()); - response.setVersion(request.getVersion()); response.attach(connection); connection.applyStateTransition(request.type, response.type); - logger.debug("Responding: {}, v={}", response, response.getVersion()); + logger.debug("Responding: {}, v={}", response, connection.getVersion()); ctx.getChannel().write(response); } catch (Exception ex) { // Don't let the exception propagate to exceptionCaught() if we can help it so that we can assign the right streamID. - ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()).setVersion(request.getVersion())); + ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 31f5df2..0a0a77c 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -66,6 +66,14 @@ public class Server implements CassandraDaemon.Server private final ConnectionTracker connectionTracker = new ConnectionTracker(); + private final Connection.Factory connectionFactory = new Connection.Factory() + { + public Connection newConnection(Channel channel, int version) + { + return new ServerConnection(channel, version, connectionTracker); + } + }; + public final InetSocketAddress socket; private final AtomicBoolean isRunning = new AtomicBoolean(false); @@ -163,6 +171,7 @@ public class Server implements CassandraDaemon.Server logger.info("Stop listening for CQL clients"); } + public static class ConnectionTracker implements Connection.Tracker { public final ChannelGroup allChannels = new DefaultChannelGroup(); @@ -224,7 +233,7 @@ public class Server implements CassandraDaemon.Server //pipeline.addLast("debug", new LoggingHandler()); - pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionTracker, ServerConnection.FACTORY)); + pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); pipeline.addLast("frameDecompressor", frameDecompressor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/ServerConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java b/src/java/org/apache/cassandra/transport/ServerConnection.java index 7ec4a00..97b6b5a 100644 --- a/src/java/org/apache/cassandra/transport/ServerConnection.java +++ b/src/java/org/apache/cassandra/transport/ServerConnection.java @@ -19,6 +19,8 @@ package org.apache.cassandra.transport; import java.util.concurrent.ConcurrentMap; +import org.jboss.netty.channel.Channel; + import org.apache.cassandra.auth.IAuthenticator; import org.apache.cassandra.auth.ISaslAwareAuthenticator; import org.apache.cassandra.config.DatabaseDescriptor; @@ -30,14 +32,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap; public class ServerConnection extends Connection { - public static final Factory FACTORY = new Factory() - { - public Connection newConnection(Connection.Tracker tracker) - { - return new ServerConnection(tracker); - } - }; - private enum State { UNINITIALIZED, AUTHENTICATION, READY; } private volatile SaslAuthenticator saslAuthenticator; @@ -46,9 +40,9 @@ public class ServerConnection extends Connection private final ConcurrentMap<Integer, QueryState> queryStates = new NonBlockingHashMap<Integer, QueryState>(); - public ServerConnection(Connection.Tracker tracker) + public ServerConnection(Channel channel, int version, Connection.Tracker tracker) { - super(tracker); + super(channel, version, tracker); this.clientState = new ClientState(); this.state = State.UNINITIALIZED; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/SimpleClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java b/src/java/org/apache/cassandra/transport/SimpleClient.java index b9dd3f5..df4f811 100644 --- a/src/java/org/apache/cassandra/transport/SimpleClient.java +++ b/src/java/org/apache/cassandra/transport/SimpleClient.java @@ -71,15 +71,17 @@ public class SimpleClient protected final ResponseHandler responseHandler = new ResponseHandler(); protected final Connection.Tracker tracker = new ConnectionTracker(); - protected final Connection connection = new Connection(tracker); + // We don't track connection really, so we don't need one Connection per channel + protected final Connection connection = new Connection(null, Frame.Header.CURRENT_VERSION, tracker); protected ClientBootstrap bootstrap; protected Channel channel; protected ChannelFuture lastWriteFuture; private final Connection.Factory connectionFactory = new Connection.Factory() { - public Connection newConnection(Connection.Tracker tracker) + public Connection newConnection(Channel channel, int version) { + assert version == Frame.Header.CURRENT_VERSION; return connection; } }; @@ -227,7 +229,7 @@ public class SimpleClient //pipeline.addLast("debug", new LoggingHandler()); - pipeline.addLast("frameDecoder", new Frame.Decoder(tracker, connectionFactory)); + pipeline.addLast("frameDecoder", new Frame.Decoder(connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); pipeline.addLast("frameDecompressor", frameDecompressor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java index 63df7d0..92f18bd 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java @@ -54,10 +54,9 @@ public class AuthChallenge extends Message.Response this.token = token; } - @Override - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public byte[] getToken() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java index 8a33a72..6b2eb24 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java @@ -66,10 +66,9 @@ public class AuthResponse extends Message.Request this.token = token; } - @Override - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java index 13c750a..e4b75e3 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java @@ -57,10 +57,9 @@ public class AuthSuccess extends Message.Response this.token = token; } - @Override - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public byte[] getToken() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java index 292f748..11d6443 100644 --- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java @@ -49,9 +49,9 @@ public class AuthenticateMessage extends Message.Response this.authenticator = authenticator; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 9fb4482..ad348e3 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -158,9 +158,9 @@ public class BatchMessage extends Message.Request this.consistency = consistency; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java index 207907a..a00d98b 100644 --- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java @@ -76,9 +76,9 @@ public class CredentialsMessage extends Message.Request super(Message.Type.CREDENTIALS); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java index 3675f08..7bc65e5 100644 --- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java @@ -211,9 +211,9 @@ public class ErrorMessage extends Message.Response return new ErrorMessage(new ServerError(e), streamId); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/EventMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java b/src/java/org/apache/cassandra/transport/messages/EventMessage.java index f7a93ae..46399fe 100644 --- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java @@ -46,9 +46,9 @@ public class EventMessage extends Message.Response this.setStreamId(-1); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 6dd3fc6..f83df9d 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -167,9 +167,9 @@ public class ExecuteMessage extends Message.Request this.pagingState = pagingState; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java index 49d8e1b..53f8504 100644 --- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java @@ -53,9 +53,9 @@ public class OptionsMessage extends Message.Request super(Message.Type.OPTIONS); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java index 851f3f8..d7eaa5b 100644 --- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java @@ -52,9 +52,9 @@ public class PrepareMessage extends Message.Request this.query = query; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index e9f3d46..9e8050c 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -184,9 +184,9 @@ public class QueryMessage extends Message.Request this.pagingState = pagingState; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java index 414fdd3..fe90cd9 100644 --- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java @@ -45,9 +45,9 @@ public class ReadyMessage extends Message.Response super(Message.Type.READY); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java index a6816fb..1902f9a 100644 --- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java @@ -67,9 +67,9 @@ public class RegisterMessage extends Message.Request return new ReadyMessage(); } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java index 9e75f9d..b703cd0 100644 --- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -48,7 +48,7 @@ public abstract class ResultMessage extends Message.Response ChannelBuffer kcb = ChannelBuffers.buffer(4); kcb.writeInt(msg.kind.id); - ChannelBuffer body = msg.encodeBody(); + ChannelBuffer body = msg.encodeBody(version); return ChannelBuffers.wrappedBuffer(kcb, body); } }; @@ -102,12 +102,12 @@ public abstract class ResultMessage extends Message.Response this.kind = kind; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } - protected abstract ChannelBuffer encodeBody(); + protected abstract ChannelBuffer encodeBody(int version); public abstract CqlResult toThriftResult(); @@ -134,9 +134,9 @@ public abstract class ResultMessage extends Message.Response } }; - protected ChannelBuffer encodeBody() + protected ChannelBuffer encodeBody(int version) { - return subcodec.encode(this, getVersion()); + return subcodec.encode(this, version); } public CqlResult toThriftResult() @@ -176,9 +176,9 @@ public abstract class ResultMessage extends Message.Response } }; - protected ChannelBuffer encodeBody() + protected ChannelBuffer encodeBody(int version) { - return subcodec.encode(this, getVersion()); + return subcodec.encode(this, version); } public CqlResult toThriftResult() @@ -218,9 +218,9 @@ public abstract class ResultMessage extends Message.Response this.result = result; } - protected ChannelBuffer encodeBody() + protected ChannelBuffer encodeBody(int version) { - return subcodec.encode(this, getVersion()); + return subcodec.encode(this, version); } public CqlResult toThriftResult() @@ -299,9 +299,9 @@ public abstract class ResultMessage extends Message.Response return ((SelectStatement)statement).getResultMetadata(); } - protected ChannelBuffer encodeBody() + protected ChannelBuffer encodeBody(int version) { - return subcodec.encode(this, getVersion()); + return subcodec.encode(this, version); } public CqlResult toThriftResult() @@ -382,9 +382,9 @@ public abstract class ResultMessage extends Message.Response } }; - protected ChannelBuffer encodeBody() + protected ChannelBuffer encodeBody(int version) { - return subcodec.encode(this, getVersion()); + return subcodec.encode(this, version); } public CqlResult toThriftResult() http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java index 6d6d1d9..9ca6a4c 100644 --- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -61,9 +61,9 @@ public class StartupMessage extends Message.Request this.options = options; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } public Message.Response execute(QueryState state) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java index 7318112..0184a8c 100644 --- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java @@ -54,9 +54,9 @@ public class SupportedMessage extends Message.Response this.supported = supported; } - public ChannelBuffer encode() + public ChannelBuffer encode(int version) { - return codec.encode(this, getVersion()); + return codec.encode(this, version); } @Override
