Updated Branches: refs/heads/trunk b93960e28 -> bef8eef1b
Binary protocol: handle asynchronous execution (better) patch by slebresne; reviewed by thepaul for CASSANDRA-4473 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bef8eef1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bef8eef1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bef8eef1 Branch: refs/heads/trunk Commit: bef8eef1b12b6ea8e9f6ee3ce12547c7302d7bb9 Parents: b93960e Author: Sylvain Lebresne <[email protected]> Authored: Wed Aug 1 20:35:51 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Aug 1 20:35:51 2012 +0200 ---------------------------------------------------------------------- doc/native_protocol.spec | 54 ++++++++++----- src/java/org/apache/cassandra/transport/Frame.java | 22 ++++--- .../org/apache/cassandra/transport/Message.java | 15 ++++- 3 files changed, 64 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/doc/native_protocol.spec ---------------------------------------------------------------------- diff --git a/doc/native_protocol.spec b/doc/native_protocol.spec index 868a46e..ae3c1fd 100644 --- a/doc/native_protocol.spec +++ b/doc/native_protocol.spec @@ -39,7 +39,7 @@ Table of Contents 0 8 16 24 32 +---------+---------+---------+---------+ - | version | flags | opcode | + | version | flags | stream | opcode | +---------+---------+---------+---------+ | length | +---------+---------+---------+---------+ @@ -50,7 +50,7 @@ Table of Contents +---------------------------------------- Each frame contains a fixed size header (8 bytes) followed by a variable size - body. The header is described in Section 2. The content of the body depends + body. The header is described in Section 2. The content of the body depends on the header opcode value (the body can in particular be empty for some opcode values). The list of allowed opcode is defined Section 2.3 and the details of each corresponding message is described Section 4. @@ -94,21 +94,41 @@ Table of Contents through the Startup message (which thus cannot be compressed; Section 4.1.1). The rest of the flags is kept for future use. - -2.3. opcode - - A 2 byte integer that distinguish the actual message: - 0x0000 ERROR - 0x0001 STARTUP - 0x0002 READY - 0x0003 AUTHENTICATE - 0x0004 CREDENTIALS - 0x0005 OPTIONS - 0x0006 SUPPORTED - 0x0007 QUERY - 0x0008 RESULT - 0x0009 PREPARE - 0x000A EXECUTE +2.3. stream + + A frame has a stream id (one signed byte). When sending request messages, this + stream id must be set by the client to a positive byte (negative stream id + are reserved for future stream initiated by the server). If a client sends a + request message with the stream id X, it is guaranteed that the stream id of + the response to that message will be X. + + This allow to deal with the asynchronous nature of the protocol. If a client + sends multiple messages simultaneously (without waiting for responses), there + is no guarantee on the order of the responses. For instance, if the client + writes REQ_1, REQ_2, REQ_3 on the wire (in that order), the server might + respond to REQ_3 (or REQ_2) first. Assigning different stream id to these 3 + requests allows the client to distinguish to which request an received answer + respond to. As there can only be 128 different simultaneous stream, it is up + to the client to reuse stream id. + + Note that clients are free to use the protocol synchronously (i.e. wait for + the response to REQ_N before sending REQ_N+1). In that case, the stream id + can be safely set to 0. + +2.4. opcode + + An integer byte that distinguish the actual message: + 0x00 ERROR + 0x01 STARTUP + 0x02 READY + 0x03 AUTHENTICATE + 0x04 CREDENTIALS + 0x05 OPTIONS + 0x06 SUPPORTED + 0x07 QUERY + 0x08 RESULT + 0x09 PREPARE + 0x0A EXECUTE Messages are described in Section 4. http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 9d4885d..3f85b1a 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -42,7 +42,7 @@ public class Frame * * 0 8 16 24 32 * +---------+---------+---------+---------+ - * | version | flags | opcode | + * | version | flags | stream | opcode | * +---------+---------+---------+---------+ * | length | * +---------+---------+---------+---------+ @@ -62,7 +62,8 @@ public class Frame int version = fullFrame.readByte(); int flags = fullFrame.readByte(); - int opcode = fullFrame.readUnsignedShort(); + int streamId = fullFrame.readByte(); + int opcode = fullFrame.readByte(); int length = fullFrame.readInt(); assert length == fullFrame.readableBytes(); @@ -70,14 +71,14 @@ public class Frame Message.Direction direction = Message.Direction.extractFromVersion(version); version = version & 0x7F; - Header header = new Header(version, flags, Message.Type.fromOpcode(opcode, direction)); + Header header = new Header(version, flags, streamId, Message.Type.fromOpcode(opcode, direction)); return new Frame(header, fullFrame, connection); } - public static Frame create(Message.Type type, ChannelBuffer body, Connection connection) + public static Frame create(Message.Type type, int streamId, ChannelBuffer body, Connection connection) { EnumSet<Header.Flag> flags = EnumSet.noneOf(Header.Flag.class); - Header header = new Header(Header.CURRENT_VERSION, flags, type); + Header header = new Header(Header.CURRENT_VERSION, flags, streamId, type); return new Frame(header, body, connection); } @@ -88,17 +89,19 @@ public class Frame public final int version; public final EnumSet<Flag> flags; + public final int streamId; public final Message.Type type; - private Header(int version, int flags, Message.Type type) + private Header(int version, int flags, int streamId, Message.Type type) { - this(version, Flag.deserialize(flags), type); + this(version, Flag.deserialize(flags), streamId, type); } - private Header(int version, EnumSet<Flag> flags, Message.Type type) + private Header(int version, EnumSet<Flag> flags, int streamId, Message.Type type) { this.version = version; this.flags = flags; + this.streamId = streamId; this.type = type; } @@ -187,7 +190,8 @@ public class Frame Message.Type type = frame.header.type; header.writeByte(type.direction.addToVersion(frame.header.version)); header.writeByte(Header.Flag.serialize(frame.header.flags)); - header.writeShort(type.opcode); + header.writeByte(frame.header.streamId); + header.writeByte(type.opcode); header.writeInt(frame.body.readableBytes()); return ChannelBuffers.wrappedBuffer(header, frame.body); http://git-wip-us.apache.org/repos/asf/cassandra/blob/bef8eef1/src/java/org/apache/cassandra/transport/Message.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java index 9294cc8..0a51ad2 100644 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@ -107,6 +107,7 @@ public abstract class Message public final Type type; protected Connection connection; + protected int streamId; protected Message(Type type) { @@ -123,6 +124,16 @@ public abstract class Message return connection; } + public void setStreamId(int streamId) + { + this.streamId = streamId; + } + + public int getStreamId() + { + return streamId; + } + public abstract ChannelBuffer encode(); public static abstract class Request extends Message @@ -157,6 +168,7 @@ public abstract class Message Frame frame = (Frame)msg; Message message = frame.header.type.codec.decode(frame.body); + message.setStreamId(frame.header.streamId); if (message instanceof Request) ((Request)message).attach(frame.connection); return message; @@ -170,7 +182,7 @@ public abstract class Message assert msg instanceof Message : "Expecting message, got " + msg; Message message = (Message)msg; - return Frame.create(message.type, message.encode(), message.connection()); + return Frame.create(message.type, message.getStreamId(), message.encode(), message.connection()); } } @@ -191,6 +203,7 @@ public abstract class Message logger.debug("Received: " + request); Response response = request.execute(); + response.setStreamId(request.getStreamId()); response.attach(connection); response.connection().applyStateTransition(request.type, response.type);
