Updated Branches:
  refs/heads/trunk 5ac567628 -> 5a18e3706

Native protocol event don't respect the protocol version

patch by slebresne; reviewed by iamaleksey for CASSANDRA-5778


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5a18e370
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5a18e370
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5a18e370

Branch: refs/heads/trunk
Commit: 5a18e3706424270439aaacbf8ef9b57547510459
Parents: 5ac5676
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Jul 19 17:21:40 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Fri Jul 19 17:21:40 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 doc/native_protocol_v2.spec                     | 11 +++--
 .../apache/cassandra/transport/Connection.java  | 19 ++++---
 .../org/apache/cassandra/transport/Frame.java   | 52 +++++++++++---------
 .../org/apache/cassandra/transport/Message.java | 34 +++++--------
 .../org/apache/cassandra/transport/Server.java  | 11 ++++-
 .../cassandra/transport/ServerConnection.java   | 14 ++----
 .../cassandra/transport/SimpleClient.java       |  8 +--
 .../transport/messages/AuthChallenge.java       |  5 +-
 .../transport/messages/AuthResponse.java        |  5 +-
 .../transport/messages/AuthSuccess.java         |  5 +-
 .../transport/messages/AuthenticateMessage.java |  4 +-
 .../transport/messages/BatchMessage.java        |  4 +-
 .../transport/messages/CredentialsMessage.java  |  4 +-
 .../transport/messages/ErrorMessage.java        |  4 +-
 .../transport/messages/EventMessage.java        |  4 +-
 .../transport/messages/ExecuteMessage.java      |  4 +-
 .../transport/messages/OptionsMessage.java      |  4 +-
 .../transport/messages/PrepareMessage.java      |  4 +-
 .../transport/messages/QueryMessage.java        |  4 +-
 .../transport/messages/ReadyMessage.java        |  4 +-
 .../transport/messages/RegisterMessage.java     |  4 +-
 .../transport/messages/ResultMessage.java       | 28 +++++------
 .../transport/messages/StartupMessage.java      |  4 +-
 .../transport/messages/SupportedMessage.java    |  4 +-
 25 files changed, 126 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6b2e2ba..6fa719f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -12,6 +12,7 @@
  * Fix bugs in the native protocol v2 (CASSANDRA-5770)
  * CAS on 'primary key only' table (CASSANDRA-5715)
  * Support streaming SSTables of old versions (CASSANDRA-5772)
+ * Always respect protocol version in native protocol (CASSANDRA-5778)
 
 
 2.0.0-beta1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/doc/native_protocol_v2.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol_v2.spec b/doc/native_protocol_v2.spec
index 76d697f..9b6d52c 100644
--- a/doc/native_protocol_v2.spec
+++ b/doc/native_protocol_v2.spec
@@ -69,10 +69,8 @@ Table of Contents
 
   The protocol distinguishes 2 types of frames: requests and responses. 
Requests
   are those frame sent by the clients to the server, response are the ones sent
-  by the server. Note however that while communication are initiated by the
-  client with the server responding to request, the protocol may likely add
-  server pushes in the future, so responses does not obligatory come right 
after
-  a client request.
+  by the server. Note however that the protocol supports server pushes (events)
+  so responses does not necessarily come right after a client request.
 
   Note to client implementors: clients library should always assume that the
   body of a given frame may contain more data than what is described in this
@@ -98,6 +96,11 @@ Table of Contents
     0x02    Request frame for this protocol version
     0x82    Response frame for this protocol version
 
+  Please note that the while every message ship with the version, only one 
version
+  of messages is accepted on a given connection. In other words, the first 
message
+  exchanged (STARTUP) sets the version for the connection for the lifetime of 
this
+  connection.
+
   This document describe the version 2 of the protocol. For the changes made 
since
   version 1, see Section 9.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Connection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Connection.java 
b/src/java/org/apache/cassandra/transport/Connection.java
index 67f06a4..a72402f 100644
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@ -21,13 +21,19 @@ import org.jboss.netty.channel.Channel;
 
 public class Connection
 {
-    private volatile FrameCompressor frameCompressor;
-    private volatile Channel channel;
+    private final Channel channel;
+    private final int version;
     private final Tracker tracker;
 
-    public Connection(Tracker tracker)
+    private volatile FrameCompressor frameCompressor;
+
+    public Connection(Channel channel, int version, Tracker tracker)
     {
+        this.channel = channel;
+        this.version = version;
         this.tracker = tracker;
+
+        tracker.addConnection(channel, this);
     }
 
     public void setCompressor(FrameCompressor compressor)
@@ -45,10 +51,9 @@ public class Connection
         return tracker;
     }
 
-    public void registerChannel(Channel ch)
+    public int getVersion()
     {
-        channel = ch;
-        tracker.addConnection(ch, this);
+        return version;
     }
 
     public Channel channel()
@@ -58,7 +63,7 @@ public class Connection
 
     public interface Factory
     {
-        public Connection newConnection(Tracker tracker);
+        public Connection newConnection(Channel channel, int version);
     }
 
     public interface Tracker

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 014d512..b19ea33 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -34,7 +34,6 @@ public class Frame
 {
     public final Header header;
     public final ChannelBuffer body;
-    public final Connection connection;
 
     /**
      * On-wire frame.
@@ -47,14 +46,13 @@ public class Frame
      *   |                length                 |
      *   +---------+---------+---------+---------+
      */
-    private Frame(Header header, ChannelBuffer body, Connection connection)
+    private Frame(Header header, ChannelBuffer body)
     {
         this.header = header;
         this.body = body;
-        this.connection = connection;
     }
 
-    public static Frame create(ChannelBuffer fullFrame, Connection connection)
+    public static Frame create(ChannelBuffer fullFrame)
     {
         assert fullFrame.readableBytes() >= Header.LENGTH : 
String.format("Frame too short (%d bytes = %s)",
                                                                           
fullFrame.readableBytes(),
@@ -72,13 +70,13 @@ public class Frame
         version = version & 0x7F;
 
         Header header = new Header(version, flags, streamId, 
Message.Type.fromOpcode(opcode, direction));
-        return new Frame(header, fullFrame, connection);
+        return new Frame(header, fullFrame);
     }
 
-    public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ChannelBuffer body, Connection connection)
+    public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ChannelBuffer body)
     {
         Header header = new Header(version, flags, streamId, type);
-        return new Frame(header, body, connection);
+        return new Frame(header, body);
     }
 
     public static class Header
@@ -134,25 +132,19 @@ public class Frame
 
     public Frame with(ChannelBuffer newBody)
     {
-        return new Frame(header, newBody, connection);
+        return new Frame(header, newBody);
     }
 
     public static class Decoder extends LengthFieldBasedFrameDecoder
     {
         private static final int MAX_FRAME_LENTH = 256 * 1024 * 1024; // 256 MB
-        private final Connection connection;
 
-        public Decoder(Connection.Tracker tracker, Connection.Factory factory)
-        {
-            super(MAX_FRAME_LENTH, 4, 4, 0, 0, true);
-            this.connection = factory.newConnection(tracker);
-        }
+        private final Connection.Factory factory;
 
-        @Override
-        public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
-        throws Exception
+        public Decoder(Connection.Factory factory)
         {
-            connection.registerChannel(e.getChannel());
+            super(MAX_FRAME_LENTH, 4, 4, 0, 0, true);
+            this.factory = factory;
         }
 
         @Override
@@ -183,7 +175,19 @@ public class Frame
                 {
                     return null;
                 }
-                return Frame.create(frame, connection);
+
+                Connection connection = (Connection)channel.getAttachment();
+                if (connection == null)
+                {
+                    // First message seen on this channel, attach the 
connection object
+                    connection = factory.newConnection(channel, version);
+                    channel.setAttachment(connection);
+                }
+                else if (connection.getVersion() != version)
+                {
+                    throw new ProtocolException(String.format("Invalid message 
version. Got %d but previous messages on this connection had version %d", 
version, connection.getVersion()));
+                }
+                return Frame.create(frame);
             }
             catch (CorruptedFrameException e)
             {
@@ -225,11 +229,12 @@ public class Frame
             assert msg instanceof Frame : "Expecting frame, got " + msg;
 
             Frame frame = (Frame)msg;
+            Connection connection = (Connection)channel.getAttachment();
 
-            if (!frame.header.flags.contains(Header.Flag.COMPRESSED))
+            if (!frame.header.flags.contains(Header.Flag.COMPRESSED) || 
connection == null)
                 return frame;
 
-            FrameCompressor compressor = frame.connection.getCompressor();
+            FrameCompressor compressor = connection.getCompressor();
             if (compressor == null)
                 return frame;
 
@@ -245,12 +250,13 @@ public class Frame
             assert msg instanceof Frame : "Expecting frame, got " + msg;
 
             Frame frame = (Frame)msg;
+            Connection connection = (Connection)channel.getAttachment();
 
             // Never compress STARTUP messages
-            if (frame.header.type == Message.Type.STARTUP || frame.connection 
== null)
+            if (frame.header.type == Message.Type.STARTUP || connection == 
null)
                 return frame;
 
-            FrameCompressor compressor = frame.connection.getCompressor();
+            FrameCompressor compressor = connection.getCompressor();
             if (compressor == null)
                 return frame;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 8c46f9b..557bddf 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -119,7 +119,6 @@ public abstract class Message
     public final Type type;
     protected volatile Connection connection;
     private volatile int streamId;
-    private volatile int version = Frame.Header.CURRENT_VERSION;
 
     protected Message(Type type)
     {
@@ -147,18 +146,7 @@ public abstract class Message
         return streamId;
     }
 
-    public int getVersion()
-    {
-        return version;
-    }
-
-    public Message setVersion(int version)
-    {
-        this.version = version;
-        return this;
-    }
-
-    public abstract ChannelBuffer encode();
+    public abstract ChannelBuffer encode(int version);
 
     public static abstract class Request extends Message
     {
@@ -225,13 +213,12 @@ public abstract class Message
             {
                 Message message = frame.header.type.codec.decode(frame.body, 
frame.header.version);
                 message.setStreamId(frame.header.streamId);
-                message.setVersion(frame.header.version);
 
                 if (isRequest)
                 {
                     assert message instanceof Request;
                     Request req = (Request)message;
-                    req.attach(frame.connection);
+                    req.attach((Connection)channel.getAttachment());
                     if (isTracing)
                         req.setTracingRequested();
                 }
@@ -260,7 +247,11 @@ public abstract class Message
 
             Message message = (Message)msg;
 
-            ChannelBuffer body = message.encode();
+            Connection connection = (Connection)channel.getAttachment();
+            // The only case the connection can be null is when we send the 
initial STARTUP message (client side thus)
+            int version = connection == null ? Frame.Header.CURRENT_VERSION : 
connection.getVersion();
+
+            ChannelBuffer body = message.encode(version);
             EnumSet<Frame.Header.Flag> flags = 
EnumSet.noneOf(Frame.Header.Flag.class);
             if (message instanceof Response)
             {
@@ -278,7 +269,7 @@ public abstract class Message
                     flags.add(Frame.Header.Flag.TRACING);
             }
 
-            return Frame.create(message.type, message.getStreamId(), 
message.getVersion(), flags, body, message.connection());
+            return Frame.create(message.type, message.getStreamId(), version, 
flags, body);
         }
     }
 
@@ -298,24 +289,23 @@ public abstract class Message
             {
                 assert request.connection() instanceof ServerConnection;
                 ServerConnection connection = 
(ServerConnection)request.connection();
-                QueryState qstate = 
connection.validateNewMessage(request.type, request.getVersion(), 
request.getStreamId());
+                QueryState qstate = 
connection.validateNewMessage(request.type, connection.getVersion(), 
request.getStreamId());
 
-                logger.debug("Received: {}, v={}", request, 
request.getVersion());
+                logger.debug("Received: {}, v={}", request, 
connection.getVersion());
 
                 Response response = request.execute(qstate);
                 response.setStreamId(request.getStreamId());
-                response.setVersion(request.getVersion());
                 response.attach(connection);
                 connection.applyStateTransition(request.type, response.type);
 
-                logger.debug("Responding: {}, v={}", response, 
response.getVersion());
+                logger.debug("Responding: {}, v={}", response, 
connection.getVersion());
 
                 ctx.getChannel().write(response);
             }
             catch (Exception ex)
             {
                 // Don't let the exception propagate to exceptionCaught() if 
we can help it so that we can assign the right streamID.
-                
ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()).setVersion(request.getVersion()));
+                
ctx.getChannel().write(ErrorMessage.fromException(ex).setStreamId(request.getStreamId()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/Server.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Server.java 
b/src/java/org/apache/cassandra/transport/Server.java
index 31f5df2..0a0a77c 100644
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@ -66,6 +66,14 @@ public class Server implements CassandraDaemon.Server
 
     private final ConnectionTracker connectionTracker = new 
ConnectionTracker();
 
+    private final Connection.Factory connectionFactory = new 
Connection.Factory()
+    {
+        public Connection newConnection(Channel channel, int version)
+        {
+            return new ServerConnection(channel, version, connectionTracker);
+        }
+    };
+
     public final InetSocketAddress socket;
     private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
@@ -163,6 +171,7 @@ public class Server implements CassandraDaemon.Server
         logger.info("Stop listening for CQL clients");
     }
 
+
     public static class ConnectionTracker implements Connection.Tracker
     {
         public final ChannelGroup allChannels = new DefaultChannelGroup();
@@ -224,7 +233,7 @@ public class Server implements CassandraDaemon.Server
 
             //pipeline.addLast("debug", new LoggingHandler());
 
-            pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionTracker, ServerConnection.FACTORY));
+            pipeline.addLast("frameDecoder", new 
Frame.Decoder(server.connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
             pipeline.addLast("frameDecompressor", frameDecompressor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/ServerConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/ServerConnection.java 
b/src/java/org/apache/cassandra/transport/ServerConnection.java
index 7ec4a00..97b6b5a 100644
--- a/src/java/org/apache/cassandra/transport/ServerConnection.java
+++ b/src/java/org/apache/cassandra/transport/ServerConnection.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.transport;
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.jboss.netty.channel.Channel;
+
 import org.apache.cassandra.auth.IAuthenticator;
 import org.apache.cassandra.auth.ISaslAwareAuthenticator;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -30,14 +32,6 @@ import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ServerConnection extends Connection
 {
-    public static final Factory FACTORY = new Factory()
-    {
-        public Connection newConnection(Connection.Tracker tracker)
-        {
-            return new ServerConnection(tracker);
-        }
-    };
-
     private enum State { UNINITIALIZED, AUTHENTICATION, READY; }
 
     private volatile SaslAuthenticator saslAuthenticator;
@@ -46,9 +40,9 @@ public class ServerConnection extends Connection
 
     private final ConcurrentMap<Integer, QueryState> queryStates = new 
NonBlockingHashMap<Integer, QueryState>();
 
-    public ServerConnection(Connection.Tracker tracker)
+    public ServerConnection(Channel channel, int version, Connection.Tracker 
tracker)
     {
-        super(tracker);
+        super(channel, version, tracker);
         this.clientState = new ClientState();
         this.state = State.UNINITIALIZED;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index b9dd3f5..df4f811 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -71,15 +71,17 @@ public class SimpleClient
 
     protected final ResponseHandler responseHandler = new ResponseHandler();
     protected final Connection.Tracker tracker = new ConnectionTracker();
-    protected final Connection connection = new Connection(tracker);
+    // We don't track connection really, so we don't need one Connection per 
channel
+    protected final Connection connection = new Connection(null, 
Frame.Header.CURRENT_VERSION, tracker);
     protected ClientBootstrap bootstrap;
     protected Channel channel;
     protected ChannelFuture lastWriteFuture;
 
     private final Connection.Factory connectionFactory = new 
Connection.Factory()
     {
-        public Connection newConnection(Connection.Tracker tracker)
+        public Connection newConnection(Channel channel, int version)
         {
+            assert version == Frame.Header.CURRENT_VERSION;
             return connection;
         }
     };
@@ -227,7 +229,7 @@ public class SimpleClient
 
             //pipeline.addLast("debug", new LoggingHandler());
 
-            pipeline.addLast("frameDecoder", new Frame.Decoder(tracker, 
connectionFactory));
+            pipeline.addLast("frameDecoder", new 
Frame.Decoder(connectionFactory));
             pipeline.addLast("frameEncoder", frameEncoder);
 
             pipeline.addLast("frameDecompressor", frameDecompressor);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java 
b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
index 63df7d0..92f18bd 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthChallenge.java
@@ -54,10 +54,9 @@ public class AuthChallenge extends Message.Response
         this.token = token;
     }
 
-    @Override
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java 
b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 8a33a72..6b2eb24 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -66,10 +66,9 @@ public class AuthResponse extends Message.Request
         this.token = token;
     }
 
-    @Override
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java 
b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
index 13c750a..e4b75e3 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthSuccess.java
@@ -57,10 +57,9 @@ public class AuthSuccess extends Message.Response
         this.token = token;
     }
 
-    @Override
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public byte[] getToken()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java 
b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
index 292f748..11d6443 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthenticateMessage.java
@@ -49,9 +49,9 @@ public class AuthenticateMessage extends Message.Response
         this.authenticator = authenticator;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java 
b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 9fb4482..ad348e3 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -158,9 +158,9 @@ public class BatchMessage extends Message.Request
         this.consistency = consistency;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index 207907a..a00d98b 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -76,9 +76,9 @@ public class CredentialsMessage extends Message.Request
         super(Message.Type.CREDENTIALS);
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
index 3675f08..7bc65e5 100644
--- a/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ErrorMessage.java
@@ -211,9 +211,9 @@ public class ErrorMessage extends Message.Response
         return new ErrorMessage(new ServerError(e), streamId);
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/EventMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/EventMessage.java 
b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
index f7a93ae..46399fe 100644
--- a/src/java/org/apache/cassandra/transport/messages/EventMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/EventMessage.java
@@ -46,9 +46,9 @@ public class EventMessage extends Message.Response
         this.setStreamId(-1);
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index 6dd3fc6..f83df9d 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -167,9 +167,9 @@ public class ExecuteMessage extends Message.Request
         this.pagingState = pagingState;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 49d8e1b..53f8504 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -53,9 +53,9 @@ public class OptionsMessage extends Message.Request
         super(Message.Type.OPTIONS);
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java 
b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index 851f3f8..d7eaa5b 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -52,9 +52,9 @@ public class PrepareMessage extends Message.Request
         this.query = query;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java 
b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index e9f3d46..9e8050c 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -184,9 +184,9 @@ public class QueryMessage extends Message.Request
         this.pagingState = pagingState;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
index 414fdd3..fe90cd9 100644
--- a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java
@@ -45,9 +45,9 @@ public class ReadyMessage extends Message.Response
         super(Message.Type.READY);
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java 
b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index a6816fb..1902f9a 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -67,9 +67,9 @@ public class RegisterMessage extends Message.Request
         return new ReadyMessage();
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java 
b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
index 9e75f9d..b703cd0 100644
--- a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java
@@ -48,7 +48,7 @@ public abstract class ResultMessage extends Message.Response
             ChannelBuffer kcb = ChannelBuffers.buffer(4);
             kcb.writeInt(msg.kind.id);
 
-            ChannelBuffer body = msg.encodeBody();
+            ChannelBuffer body = msg.encodeBody(version);
             return ChannelBuffers.wrappedBuffer(kcb, body);
         }
     };
@@ -102,12 +102,12 @@ public abstract class ResultMessage extends 
Message.Response
         this.kind = kind;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
-    protected abstract ChannelBuffer encodeBody();
+    protected abstract ChannelBuffer encodeBody(int version);
 
     public abstract CqlResult toThriftResult();
 
@@ -134,9 +134,9 @@ public abstract class ResultMessage extends Message.Response
             }
         };
 
-        protected ChannelBuffer encodeBody()
+        protected ChannelBuffer encodeBody(int version)
         {
-            return subcodec.encode(this, getVersion());
+            return subcodec.encode(this, version);
         }
 
         public CqlResult toThriftResult()
@@ -176,9 +176,9 @@ public abstract class ResultMessage extends Message.Response
             }
         };
 
-        protected ChannelBuffer encodeBody()
+        protected ChannelBuffer encodeBody(int version)
         {
-            return subcodec.encode(this, getVersion());
+            return subcodec.encode(this, version);
         }
 
         public CqlResult toThriftResult()
@@ -218,9 +218,9 @@ public abstract class ResultMessage extends Message.Response
             this.result = result;
         }
 
-        protected ChannelBuffer encodeBody()
+        protected ChannelBuffer encodeBody(int version)
         {
-            return subcodec.encode(this, getVersion());
+            return subcodec.encode(this, version);
         }
 
         public CqlResult toThriftResult()
@@ -299,9 +299,9 @@ public abstract class ResultMessage extends Message.Response
             return ((SelectStatement)statement).getResultMetadata();
         }
 
-        protected ChannelBuffer encodeBody()
+        protected ChannelBuffer encodeBody(int version)
         {
-            return subcodec.encode(this, getVersion());
+            return subcodec.encode(this, version);
         }
 
         public CqlResult toThriftResult()
@@ -382,9 +382,9 @@ public abstract class ResultMessage extends Message.Response
             }
         };
 
-        protected ChannelBuffer encodeBody()
+        protected ChannelBuffer encodeBody(int version)
         {
-            return subcodec.encode(this, getVersion());
+            return subcodec.encode(this, version);
         }
 
         public CqlResult toThriftResult()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java 
b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 6d6d1d9..9ca6a4c 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -61,9 +61,9 @@ public class StartupMessage extends Message.Request
         this.options = options;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     public Message.Response execute(QueryState state)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5a18e370/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java 
b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
index 7318112..0184a8c 100644
--- a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java
@@ -54,9 +54,9 @@ public class SupportedMessage extends Message.Response
         this.supported = supported;
     }
 
-    public ChannelBuffer encode()
+    public ChannelBuffer encode(int version)
     {
-        return codec.encode(this, getVersion());
+        return codec.encode(this, version);
     }
 
     @Override

Reply via email to