http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java new file mode 100644 index 0000000..fa92619 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/ReadyMessage.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.transport.Message; + +/** + * Message to indicate that the server is ready to receive requests. + */ +public class ReadyMessage extends Message.Response +{ + public static final Message.Codec<ReadyMessage> codec = new Message.Codec<ReadyMessage>() + { + public ReadyMessage decode(ChannelBuffer body) + { + return new ReadyMessage(); + } + + public ChannelBuffer encode(ReadyMessage msg) + { + return ChannelBuffers.EMPTY_BUFFER; + } + }; + + public ReadyMessage() + { + super(Message.Type.READY); + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + @Override + public String toString() + { + return "READY"; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/ResultMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ResultMessage.java b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java new file mode 100644 index 0000000..35f8207 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/ResultMessage.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.TimeoutException; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.cql3.ColumnSpecification; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.ResultSet; +import org.apache.cassandra.transport.*; +import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.thrift.CqlPreparedResult; +import org.apache.cassandra.thrift.CqlResult; +import org.apache.cassandra.thrift.CqlResultType; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.ByteBufferUtil; + +public abstract class ResultMessage extends Message.Response +{ + public static final Message.Codec<ResultMessage> codec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + Kind kind = Kind.fromId(body.readInt()); + return kind.subcodec.decode(body); + } + + public ChannelBuffer encode(ResultMessage msg) + { + ChannelBuffer kcb = ChannelBuffers.buffer(4); + kcb.writeInt(msg.kind.id); + + ChannelBuffer body = msg.encodeBody(); + return ChannelBuffers.wrappedBuffer(kcb, body); + } + }; + + private enum Kind + { + VOID (1, Void.subcodec), + ROWS (2, Rows.subcodec), + SET_KEYSPACE (3, SetKeyspace.subcodec), + PREPARED (4, Prepared.subcodec); + + public final int id; + public final Message.Codec<ResultMessage> subcodec; + + private static final Kind[] ids; + static + { + int maxId = -1; + for (Kind k : Kind.values()) + maxId = Math.max(maxId, k.id); + ids = new Kind[maxId + 1]; + for (Kind k : Kind.values()) + { + if (ids[k.id] != null) + throw new IllegalStateException("Duplicate kind id"); + ids[k.id] = k; + } + } + + private Kind(int id, Message.Codec<ResultMessage> subcodec) + { + this.id = id; + this.subcodec = subcodec; + } + + public static Kind fromId(int id) + { + Kind k = ids[id]; + if (k == null) + throw new ProtocolException(String.format("Unknown kind id %d in RESULT message", id)); + return k; + } + } + + private final Kind kind; + + protected ResultMessage(Kind kind) + { + super(Message.Type.RESULT); + this.kind = kind; + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + protected abstract ChannelBuffer encodeBody(); + + public abstract CqlResult toThriftResult(); + + public static class Void extends ResultMessage + { + // use VOID_MESSAGE + private Void() + { + super(Kind.VOID); + } + + public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + return Void.instance(); + } + + public ChannelBuffer encode(ResultMessage msg) + { + assert msg instanceof Void; + return ChannelBuffers.EMPTY_BUFFER; + } + }; + + protected ChannelBuffer encodeBody() + { + return subcodec.encode(this); + } + + public CqlResult toThriftResult() + { + return new CqlResult(CqlResultType.VOID); + } + + public static Void instance() + { + return Holder.instance; + } + + // Battling java initialization + private static class Holder + { + static final Void instance = new Void(); + } + + @Override + public String toString() + { + return "EMPTY RESULT"; + } + } + + public static class SetKeyspace extends ResultMessage + { + private final String keyspace; + + public SetKeyspace(String keyspace) + { + super(Kind.SET_KEYSPACE); + this.keyspace = keyspace; + } + + public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + String keyspace = CBUtil.readString(body); + return new SetKeyspace(keyspace); + } + + public ChannelBuffer encode(ResultMessage msg) + { + assert msg instanceof SetKeyspace; + return CBUtil.stringToCB(((SetKeyspace)msg).keyspace); + } + }; + + protected ChannelBuffer encodeBody() + { + return subcodec.encode(this); + } + + public CqlResult toThriftResult() + { + return new CqlResult(CqlResultType.VOID); + } + + @Override + public String toString() + { + return "RESULT set keyspace " + keyspace; + } + } + + public static class Rows extends ResultMessage + { + public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + return new Rows(ResultSet.codec.decode(body)); + } + + public ChannelBuffer encode(ResultMessage msg) + { + assert msg instanceof Rows; + Rows rowMsg = (Rows)msg; + return ResultSet.codec.encode(rowMsg.result); + } + }; + + public final ResultSet result; + + public Rows(ResultSet result) + { + super(Kind.ROWS); + this.result = result; + } + + protected ChannelBuffer encodeBody() + { + return subcodec.encode(this); + } + + public CqlResult toThriftResult() + { + return result.toThriftResult(); + } + + @Override + public String toString() + { + return "ROWS " + result; + } + + } + + public static class Prepared extends ResultMessage + { + public static final Message.Codec<ResultMessage> subcodec = new Message.Codec<ResultMessage>() + { + public ResultMessage decode(ChannelBuffer body) + { + int id = body.readInt(); + return new Prepared(id, ResultSet.Metadata.codec.decode(body)); + } + + public ChannelBuffer encode(ResultMessage msg) + { + assert msg instanceof Prepared; + Prepared prepared = (Prepared)msg; + return ChannelBuffers.wrappedBuffer(CBUtil.intToCB(prepared.statementId), ResultSet.Metadata.codec.encode(prepared.metadata)); + } + }; + + public final int statementId; + public final ResultSet.Metadata metadata; + + public Prepared(int statementId, List<ColumnSpecification> names) + { + this(statementId, new ResultSet.Metadata(names)); + } + + private Prepared(int statementId, ResultSet.Metadata metadata) + { + super(Kind.PREPARED); + this.statementId = statementId; + this.metadata = metadata; + } + + protected ChannelBuffer encodeBody() + { + return subcodec.encode(this); + } + + public CqlResult toThriftResult() + { + throw new UnsupportedOperationException(); + } + + public CqlPreparedResult toThriftPreparedResult() + { + List<String> namesString = new ArrayList<String>(metadata.names.size()); + List<String> typesString = new ArrayList<String>(metadata.names.size()); + for (ColumnSpecification name : metadata.names) + { + namesString.add(name.toString()); + typesString.add(TypeParser.getShortName(name.type)); + } + return new CqlPreparedResult(statementId, metadata.names.size()).setVariable_types(typesString).setVariable_names(namesString); + } + + @Override + public String toString() + { + return "RESULT PREPARED " + statementId + " " + metadata; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/StartupMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java new file mode 100644 index 0000000..0e33da5 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.util.EnumMap; +import java.util.Map; + +import com.google.common.base.Charsets; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.transport.*; +import org.apache.cassandra.thrift.InvalidRequestException; +import org.apache.cassandra.utils.SemanticVersion; + +/** + * The initial message of the protocol. + * Sets up a number of connection options. + */ +public class StartupMessage extends Message.Request +{ + public enum Option implements OptionCodec.Codecable<Option> + { + COMPRESSION(1); + + private final int id; + + private Option(int id) + { + this.id = id; + } + + public int getId() + { + return id; + } + + public Object readValue(ChannelBuffer cb) + { + switch (this) + { + case COMPRESSION: + return CBUtil.readString(cb); + default: + throw new AssertionError(); + } + } + + public void writeValue(Object value, ChannelBuffer cb) + { + switch (this) + { + case COMPRESSION: + assert value instanceof String; + cb.writeBytes(CBUtil.stringToCB((String)value)); + break; + } + } + + public int serializedValueSize(Object value) + { + switch (this) + { + case COMPRESSION: + return 2 + ((String)value).getBytes(Charsets.UTF_8).length; + default: + throw new AssertionError(); + } + } + } + + private static OptionCodec<Option> optionCodec = new OptionCodec<Option>(Option.class); + + public static final Message.Codec<StartupMessage> codec = new Message.Codec<StartupMessage>() + { + public StartupMessage decode(ChannelBuffer body) + { + String verString = CBUtil.readString(body); + + Map<Option, Object> options = optionCodec.decode(body); + return new StartupMessage(verString, options); + } + + public ChannelBuffer encode(StartupMessage msg) + { + ChannelBuffer vcb = CBUtil.stringToCB(msg.cqlVersion); + ChannelBuffer ocb = optionCodec.encode(msg.options); + return ChannelBuffers.wrappedBuffer(vcb, ocb); + } + }; + + public final String cqlVersion; + public final Map<Option, Object> options; + + public StartupMessage(String cqlVersion, Map<Option, Object> options) + { + super(Message.Type.STARTUP); + this.cqlVersion = cqlVersion; + this.options = options; + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + public Message.Response execute() + { + try + { + connection.clientState().setCQLVersion(cqlVersion); + if (connection.clientState().getCQLVersion().compareTo(new SemanticVersion("2.99.0")) < 0) + throw new ProtocolException(String.format("CQL version %s is not support by the binary protocol (supported version are >= 3.0.0)", cqlVersion)); + + if (options.containsKey(Option.COMPRESSION)) + { + String compression = ((String)options.get(Option.COMPRESSION)).toLowerCase(); + if (compression.equals("snappy")) + { + if (FrameCompressor.SnappyCompressor.instance == null) + throw new InvalidRequestException("This instance does not support Snappy compression"); + connection.setCompressor(FrameCompressor.SnappyCompressor.instance); + } + else + { + throw new InvalidRequestException(String.format("Unknown compression algorithm: %s", compression)); + } + } + + if (connection.clientState().isLogged()) + return new ReadyMessage(); + else + return new AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName()); + } + catch (InvalidRequestException e) + { + return ErrorMessage.fromException(e); + } + } + + @Override + public String toString() + { + return "STARTUP cqlVersion=" + cqlVersion; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java new file mode 100644 index 0000000..aaed1cf --- /dev/null +++ b/src/java/org/apache/cassandra/transport/messages/SupportedMessage.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.transport.messages; + +import java.util.ArrayList; +import java.util.List; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; + +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.utils.SemanticVersion; + +/** + * Message to indicate that the server is ready to receive requests. + */ +public class SupportedMessage extends Message.Response +{ + public static final Message.Codec<SupportedMessage> codec = new Message.Codec<SupportedMessage>() + { + public SupportedMessage decode(ChannelBuffer body) + { + List<String> versions = CBUtil.readStringList(body); + List<String> compressions = CBUtil.readStringList(body); + return new SupportedMessage(versions, compressions); + } + + public ChannelBuffer encode(SupportedMessage msg) + { + ChannelBuffer cb = ChannelBuffers.dynamicBuffer(); + CBUtil.writeStringList(cb, msg.cqlVersions); + CBUtil.writeStringList(cb, msg.compressions); + return cb; + } + }; + + public final List<String> cqlVersions; + public final List<String> compressions; + + public SupportedMessage() + { + this(new ArrayList<String>(), new ArrayList<String>()); + } + + private SupportedMessage(List<String> cqlVersions, List<String> compressions) + { + super(Message.Type.SUPPORTED); + this.cqlVersions = cqlVersions; + this.compressions = compressions; + } + + public ChannelBuffer encode() + { + return codec.encode(this); + } + + @Override + public String toString() + { + return "SUPPORTED versions=" + cqlVersions + " compressions=" + compressions; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d3a3eed/test/unit/org/apache/cassandra/EmbeddedServer.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/EmbeddedServer.java b/test/unit/org/apache/cassandra/EmbeddedServer.java index 4406103..c948cfa 100644 --- a/test/unit/org/apache/cassandra/EmbeddedServer.java +++ b/test/unit/org/apache/cassandra/EmbeddedServer.java @@ -55,7 +55,7 @@ public class EmbeddedServer extends SchemaLoader { case Thrift: default: - daemon = new org.apache.cassandra.thrift.CassandraDaemon(); + daemon = new org.apache.cassandra.service.CassandraDaemon(); } daemon.activate(); }
