Updated Branches:
  refs/heads/trunk b93960e28 -> bef8eef1b

Binary protocol: handle asynchronous execution (better)

patch by slebresne; reviewed by thepaul for CASSANDRA-4473


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bef8eef1
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bef8eef1
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bef8eef1

Branch: refs/heads/trunk
Commit: bef8eef1b12b6ea8e9f6ee3ce12547c7302d7bb9
Parents: b93960e
Author: Sylvain Lebresne <[email protected]>
Authored: Wed Aug 1 20:35:51 2012 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Aug 1 20:35:51 2012 +0200

----------------------------------------------------------------------
 doc/native_protocol.spec                           |   54 ++++++++++-----
 src/java/org/apache/cassandra/transport/Frame.java |   22 ++++---
 .../org/apache/cassandra/transport/Message.java    |   15 ++++-
 3 files changed, 64 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/doc/native_protocol.spec
----------------------------------------------------------------------
diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec
index 868a46e..ae3c1fd 100644
--- a/doc/native_protocol.spec
+++ b/doc/native_protocol.spec
@@ -39,7 +39,7 @@ Table of Contents
 
       0         8        16        24        32
       +---------+---------+---------+---------+
-      | version |  flags  |      opcode       |
+      | version |  flags  | stream  | opcode  |
       +---------+---------+---------+---------+
       |                length                 |
       +---------+---------+---------+---------+
@@ -50,7 +50,7 @@ Table of Contents
       +----------------------------------------
 
   Each frame contains a fixed size header (8 bytes) followed by a variable size
-  body.  The header is described in Section 2. The content of the body depends
+  body. The header is described in Section 2. The content of the body depends
   on the header opcode value (the body can in particular be empty for some
   opcode values). The list of allowed opcode is defined Section 2.3 and the
   details of each corresponding message is described Section 4.
@@ -94,21 +94,41 @@ Table of Contents
   through the Startup message (which thus cannot be compressed; Section 4.1.1).
   The rest of the flags is kept for future use.
 
-
-2.3. opcode
-
-  A 2 byte integer that distinguish the actual message:
-    0x0000    ERROR
-    0x0001    STARTUP
-    0x0002    READY
-    0x0003    AUTHENTICATE
-    0x0004    CREDENTIALS
-    0x0005    OPTIONS
-    0x0006    SUPPORTED
-    0x0007    QUERY
-    0x0008    RESULT
-    0x0009    PREPARE
-    0x000A    EXECUTE
+2.3. stream
+
+  A frame has a stream id (one signed byte). When sending request messages, 
this
+  stream id must be set by the client to a positive byte (negative stream id
+  are reserved for future stream initiated by the server). If a client sends a
+  request message with the stream id X, it is guaranteed that the stream id of
+  the response to that message will be X.
+
+  This allow to deal with the asynchronous nature of the protocol. If a client
+  sends multiple messages simultaneously (without waiting for responses), there
+  is no guarantee on the order of the responses. For instance, if the client
+  writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might
+  respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3
+  requests allows the client to distinguish to which request an received answer
+  respond to. As there can only be 128 different simultaneous stream, it is up
+  to the client to reuse stream id.
+
+  Note that clients are free to use the protocol synchronously (i.e.  wait for
+  the response to REQ_N before sending REQ_N+1). In that case, the stream id
+  can be safely set to 0.
+
+2.4. opcode
+
+  An integer byte that distinguish the actual message:
+    0x00    ERROR
+    0x01    STARTUP
+    0x02    READY
+    0x03    AUTHENTICATE
+    0x04    CREDENTIALS
+    0x05    OPTIONS
+    0x06    SUPPORTED
+    0x07    QUERY
+    0x08    RESULT
+    0x09    PREPARE
+    0x0A    EXECUTE
 
   Messages are described in Section 4.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 9d4885d..3f85b1a 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -42,7 +42,7 @@ public class Frame
      *
      *   0         8        16        24        32
      *   +---------+---------+---------+---------+
-     *   | version |  flags  |      opcode       |
+     *   | version |  flags  | stream  | opcode  |
      *   +---------+---------+---------+---------+
      *   |                length                 |
      *   +---------+---------+---------+---------+
@@ -62,7 +62,8 @@ public class Frame
 
         int version = fullFrame.readByte();
         int flags = fullFrame.readByte();
-        int opcode = fullFrame.readUnsignedShort();
+        int streamId = fullFrame.readByte();
+        int opcode = fullFrame.readByte();
         int length = fullFrame.readInt();
         assert length == fullFrame.readableBytes();
 
@@ -70,14 +71,14 @@ public class Frame
         Message.Direction direction = 
Message.Direction.extractFromVersion(version);
         version = version & 0x7F;
 
-        Header header = new Header(version, flags, 
Message.Type.fromOpcode(opcode, direction));
+        Header header = new Header(version, flags, streamId, 
Message.Type.fromOpcode(opcode, direction));
         return new Frame(header, fullFrame, connection);
     }
 
-    public static Frame create(Message.Type type, ChannelBuffer body, 
Connection connection)
+    public static Frame create(Message.Type type, int streamId, ChannelBuffer 
body, Connection connection)
     {
         EnumSet<Header.Flag> flags = EnumSet.noneOf(Header.Flag.class);
-        Header header = new Header(Header.CURRENT_VERSION, flags, type);
+        Header header = new Header(Header.CURRENT_VERSION, flags, streamId, 
type);
         return new Frame(header, body, connection);
     }
 
@@ -88,17 +89,19 @@ public class Frame
 
         public final int version;
         public final EnumSet<Flag> flags;
+        public final int streamId;
         public final Message.Type type;
 
-        private Header(int version, int flags, Message.Type type)
+        private Header(int version, int flags, int streamId, Message.Type type)
         {
-            this(version, Flag.deserialize(flags), type);
+            this(version, Flag.deserialize(flags), streamId, type);
         }
 
-        private Header(int version, EnumSet<Flag> flags, Message.Type type)
+        private Header(int version, EnumSet<Flag> flags, int streamId, 
Message.Type type)
         {
             this.version = version;
             this.flags = flags;
+            this.streamId = streamId;
             this.type = type;
         }
 
@@ -187,7 +190,8 @@ public class Frame
             Message.Type type = frame.header.type;
             
header.writeByte(type.direction.addToVersion(frame.header.version));
             header.writeByte(Header.Flag.serialize(frame.header.flags));
-            header.writeShort(type.opcode);
+            header.writeByte(frame.header.streamId);
+            header.writeByte(type.opcode);
             header.writeInt(frame.body.readableBytes());
 
             return ChannelBuffers.wrappedBuffer(header, frame.body);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 9294cc8..0a51ad2 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -107,6 +107,7 @@ public abstract class Message
 
     public final Type type;
     protected Connection connection;
+    protected int streamId;
 
     protected Message(Type type)
     {
@@ -123,6 +124,16 @@ public abstract class Message
         return connection;
     }
 
+    public void setStreamId(int streamId)
+    {
+        this.streamId = streamId;
+    }
+
+    public int getStreamId()
+    {
+        return streamId;
+    }
+
     public abstract ChannelBuffer encode();
 
     public static abstract class Request extends Message
@@ -157,6 +168,7 @@ public abstract class Message
 
             Frame frame = (Frame)msg;
             Message message = frame.header.type.codec.decode(frame.body);
+            message.setStreamId(frame.header.streamId);
             if (message instanceof Request)
                 ((Request)message).attach(frame.connection);
             return message;
@@ -170,7 +182,7 @@ public abstract class Message
             assert msg instanceof Message : "Expecting message, got " + msg;
 
             Message message = (Message)msg;
-            return Frame.create(message.type, message.encode(), 
message.connection());
+            return Frame.create(message.type, message.getStreamId(), 
message.encode(), message.connection());
         }
     }
 
@@ -191,6 +203,7 @@ public abstract class Message
             logger.debug("Received: " + request);
 
             Response response = request.execute();
+            response.setStreamId(request.getStreamId());
             response.attach(connection);
             response.connection().applyStateTransition(request.type, 
response.type);
 

Reply via email to