http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/db/marshal/CollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java index 8992aed..d65e3a6 100644 --- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java +++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java @@ -22,23 +22,18 @@ import java.io.IOException; import java.util.List; import java.util.Iterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.Lists; import org.apache.cassandra.cql3.Maps; import org.apache.cassandra.cql3.Sets; -import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -48,10 +43,6 @@ import org.apache.cassandra.utils.ByteBufferUtil; */ public abstract class CollectionType<T> extends AbstractType<T> { - private static final Logger logger = LoggerFactory.getLogger(CollectionType.class); - - public static final int MAX_ELEMENTS = 65535; - public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer(); public enum Kind @@ -148,26 +139,11 @@ public abstract class CollectionType<T> extends AbstractType<T> return values.size(); } - protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version) - { - assert isMultiCell(); - - int size = collectionSize(values); - if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS) - return size; - - logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" + - " elements will be returned to the client. Please see " + - "http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.", - def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS); - return MAX_ELEMENTS; - } - public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version) { assert isMultiCell(); List<ByteBuffer> values = serializedValues(cells); - int size = enforceLimit(def, values, version); + int size = collectionSize(values); return CollectionSerializer.pack(values, size, version); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/serializers/CollectionSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java index 5fb3e0a..3d6be67 100644 --- a/src/java/org/apache/cassandra/serializers/CollectionSerializer.java +++ b/src/java/org/apache/cassandra/serializers/CollectionSerializer.java @@ -50,11 +50,6 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> return deserializeForNativeProtocol(bytes, Server.VERSION_3); } - public ByteBuffer reserializeToV3(ByteBuffer bytes) - { - return serialize(deserializeForNativeProtocol(bytes, 2)); - } - public void validate(ByteBuffer bytes) throws MarshalException { // Same thing as above @@ -76,69 +71,42 @@ public abstract class CollectionSerializer<T> implements TypeSerializer<T> protected static void writeCollectionSize(ByteBuffer output, int elements, int version) { - if (version >= Server.VERSION_3) output.putInt(elements); - else - output.putShort((short)elements); } public static int readCollectionSize(ByteBuffer input, int version) { - return version >= Server.VERSION_3 ? input.getInt() : ByteBufferUtil.readShortLength(input); + return input.getInt(); } protected static int sizeOfCollectionSize(int elements, int version) { - return version >= Server.VERSION_3 ? 4 : 2; + return 4; } public static void writeValue(ByteBuffer output, ByteBuffer value, int version) { - if (version >= Server.VERSION_3) + if (value == null) { - if (value == null) - { - output.putInt(-1); - return; - } - - output.putInt(value.remaining()); - output.put(value.duplicate()); - } - else - { - assert value != null; - output.putShort((short)value.remaining()); - output.put(value.duplicate()); + output.putInt(-1); + return; } + + output.putInt(value.remaining()); + output.put(value.duplicate()); } public static ByteBuffer readValue(ByteBuffer input, int version) { - if (version >= Server.VERSION_3) - { - int size = input.getInt(); - if (size < 0) - return null; + int size = input.getInt(); + if (size < 0) + return null; - return ByteBufferUtil.readBytes(input, size); - } - else - { - return ByteBufferUtil.readBytesWithShortLength(input); - } + return ByteBufferUtil.readBytes(input, size); } public static int sizeOfValue(ByteBuffer value, int version) { - if (version >= Server.VERSION_3) - { - return value == null ? 4 : 4 + value.remaining(); - } - else - { - assert value != null; - return 2 + value.remaining(); - } + return value == null ? 4 : 4 + value.remaining(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 9cd1653..14cd812 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -2286,7 +2286,7 @@ public class CassandraServer implements Cassandra.Iface ThriftClientState cState = state(); return ClientState.getCQLQueryHandler().process(queryString, cState.getQueryState(), - QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), + QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), Collections.<ByteBuffer>emptyList()), null).toThriftResult(); } @@ -2358,7 +2358,7 @@ public class CassandraServer implements Cassandra.Iface return ClientState.getCQLQueryHandler().processPrepared(prepared.statement, cState.getQueryState(), - QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables), + QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables), null).toThriftResult(); } catch (RequestExecutionException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Event.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Event.java b/src/java/org/apache/cassandra/transport/Event.java index 9b6fdd4..b7f4650 100644 --- a/src/java/org/apache/cassandra/transport/Event.java +++ b/src/java/org/apache/cassandra/transport/Event.java @@ -21,7 +21,6 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Iterator; import java.util.List; -import java.util.UUID; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; @@ -29,9 +28,9 @@ import io.netty.buffer.ByteBuf; public abstract class Event { public enum Type { - TOPOLOGY_CHANGE(Server.VERSION_1), - STATUS_CHANGE(Server.VERSION_1), - SCHEMA_CHANGE(Server.VERSION_1), + TOPOLOGY_CHANGE(Server.VERSION_3), + STATUS_CHANGE(Server.VERSION_3), + SCHEMA_CHANGE(Server.VERSION_3), TRACE_COMPLETE(Server.VERSION_4); public final int minimumVersion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Frame.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 14fe589..04cc95e 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -27,7 +27,6 @@ import io.netty.channel.*; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageEncoder; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.transport.messages.ErrorMessage; @@ -50,16 +49,6 @@ public class Frame * +---------+---------+---------+---------+---------+ * | length | * +---------+---------+---------+---------+ - * - * - * In versions 1 and 2 the header has a smaller (1 byte) stream id, and is thus defined the following way: - * - * 0 8 16 24 32 - * +---------+---------+---------+---------+ - * | version | flags | stream | opcode | - * +---------+---------+---------+---------+ - * | length | - * +---------+---------+---------+---------+ */ private Frame(Header header, ByteBuf body) { @@ -85,9 +74,8 @@ public class Frame public static class Header { - // 8 bytes in protocol versions 1 and 2, 8 bytes in protocol version 3 and later - public static final int MODERN_LENGTH = 9; - public static final int LEGACY_LENGTH = 8; + // 9 bytes in protocol version 3 and later + public static final int LENGTH = 9; public static final int BODY_LENGTH_SIZE = 4; @@ -174,8 +162,8 @@ public class Frame return; } - // Wait until we have read at least the short header - if (buffer.readableBytes() < Header.LEGACY_LENGTH) + // Wait until we have the complete header + if (buffer.readableBytes() < Header.LENGTH) return; int idx = buffer.readerIndex(); @@ -184,29 +172,14 @@ public class Frame Message.Direction direction = Message.Direction.extractFromVersion(firstByte); int version = firstByte & PROTOCOL_VERSION_MASK; - if (version > Server.CURRENT_VERSION) - throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); highest supported is %d ", - version, Server.CURRENT_VERSION)); - - // Wait until we have the complete V3+ header - if (version >= Server.VERSION_3 && buffer.readableBytes() < Header.MODERN_LENGTH) - return; + if (version < Server.MIN_SUPPORTED_VERSION || version > Server.CURRENT_VERSION) + throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d", + version, Server.MIN_SUPPORTED_VERSION, Server.CURRENT_VERSION)); int flags = buffer.getByte(idx++); - int streamId, headerLength; - if (version >= Server.VERSION_3) - { - streamId = buffer.getShort(idx); - idx += 2; - headerLength = Header.MODERN_LENGTH; - } - else - { - streamId = buffer.getByte(idx); - idx++; - headerLength = Header.LEGACY_LENGTH; - } + int streamId = buffer.getShort(idx); + idx += 2; // This throws a protocol exceptions if the opcode is unknown Message.Type type; @@ -222,13 +195,7 @@ public class Frame long bodyLength = buffer.getUnsignedInt(idx); idx += Header.BODY_LENGTH_SIZE; - if (bodyLength < 0) - { - buffer.skipBytes(headerLength); - throw ErrorMessage.wrap(new ProtocolException("Invalid frame body length: " + bodyLength), streamId); - } - - long frameLength = bodyLength + headerLength; + long frameLength = bodyLength + Header.LENGTH; if (frameLength > MAX_FRAME_LENGTH) { // Enter the discard mode and discard everything received so far. @@ -295,10 +262,7 @@ public class Frame public void encode(ChannelHandlerContext ctx, Frame frame, List<Object> results) throws IOException { - int headerLength = frame.header.version >= Server.VERSION_3 - ? Header.MODERN_LENGTH - : Header.LEGACY_LENGTH; - ByteBuf header = CBUtil.allocator.buffer(headerLength); + ByteBuf header = CBUtil.allocator.buffer(Header.LENGTH); Message.Type type = frame.header.type; header.writeByte(type.direction.addToVersion(frame.header.version)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/Server.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index cafc0ce..a9c9bee 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -63,11 +63,10 @@ public class Server implements CassandraDaemon.Server private static final Logger logger = LoggerFactory.getLogger(Server.class); private static final boolean useEpoll = NativeTransportService.useEpoll(); - public static final int VERSION_1 = 1; - public static final int VERSION_2 = 2; public static final int VERSION_3 = 3; public static final int VERSION_4 = 4; public static final int CURRENT_VERSION = VERSION_4; + public static final int MIN_SUPPORTED_VERSION = VERSION_3; private final ConnectionTracker connectionTracker = new ConnectionTracker(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/BatchMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java index 2db380b..5baf1a6 100644 --- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java @@ -45,9 +45,6 @@ public class BatchMessage extends Message.Request { public BatchMessage decode(ByteBuf body, int version) { - if (version == 1) - throw new ProtocolException("BATCH messages are not support in version 1 of the protocol"); - byte type = body.readByte(); int n = body.readUnsignedShort(); List<Object> queryOrIds = new ArrayList<>(n); @@ -63,9 +60,7 @@ public class BatchMessage extends Message.Request throw new ProtocolException("Invalid query kind in BATCH messages. Must be 0 or 1 but got " + kind); variables.add(CBUtil.readValueList(body, version)); } - QueryOptions options = version < 3 - ? QueryOptions.fromPreV3Batch(CBUtil.readConsistencyLevel(body)) - : QueryOptions.codec.decode(body, version); + QueryOptions options = QueryOptions.codec.decode(body, version); return new BatchMessage(toType(type), queryOrIds, variables, options); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java index 718595c..940a0fc 100644 --- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.transport.messages; -import java.nio.ByteBuffer; -import java.util.List; import java.util.UUID; import com.google.common.collect.ImmutableMap; @@ -28,7 +26,6 @@ import org.apache.cassandra.cql3.CQLStatement; import org.apache.cassandra.cql3.QueryHandler; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.statements.ParsedStatement; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.exceptions.PreparedQueryNotFoundException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; @@ -45,16 +42,7 @@ public class ExecuteMessage extends Message.Request public ExecuteMessage decode(ByteBuf body, int version) { byte[] id = CBUtil.readBytes(body); - if (version == 1) - { - List<ByteBuffer> values = CBUtil.readValueList(body, version); - ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); - return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.fromProtocolV1(consistency, values)); - } - else - { - return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version)); - } + return new ExecuteMessage(MD5Digest.wrap(id), QueryOptions.codec.decode(body, version)); } public void encode(ExecuteMessage msg, ByteBuf dest, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/src/java/org/apache/cassandra/transport/messages/QueryMessage.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java index 67f3734..3b48d52 100644 --- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java +++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java @@ -17,20 +17,20 @@ */ package org.apache.cassandra.transport.messages; -import java.nio.ByteBuffer; -import java.util.Collections; import java.util.UUID; import com.google.common.collect.ImmutableMap; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBuf; import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.exceptions.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.CBUtil; +import org.apache.cassandra.transport.Message; +import org.apache.cassandra.transport.ProtocolException; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.UUIDGen; @@ -44,15 +44,7 @@ public class QueryMessage extends Message.Request public QueryMessage decode(ByteBuf body, int version) { String query = CBUtil.readLongString(body); - if (version == 1) - { - ConsistencyLevel consistency = CBUtil.readConsistencyLevel(body); - return new QueryMessage(query, QueryOptions.fromProtocolV1(consistency, Collections.<ByteBuffer>emptyList())); - } - else - { - return new QueryMessage(query, QueryOptions.codec.decode(body, version)); - } + return new QueryMessage(query, QueryOptions.codec.decode(body, version)); } public void encode(QueryMessage msg, ByteBuf dest, int version) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 3d3729a..25dfc28 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -32,12 +32,16 @@ import java.util.concurrent.atomic.AtomicInteger; import com.datastax.driver.core.*; import com.datastax.driver.core.ResultSet; + import com.google.common.base.Objects; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; + import org.junit.*; + +import com.datastax.driver.core.Cluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CFMetaData; @@ -52,7 +56,6 @@ import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.marshal.TupleType; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.service.ClientState; @@ -62,7 +65,6 @@ import org.apache.cassandra.transport.Event; import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; - import static junit.framework.Assert.assertNotNull; /** @@ -81,30 +83,29 @@ public abstract class CQLTester private static org.apache.cassandra.transport.Server server; protected static final int nativePort; protected static final InetAddress nativeAddr; - private static final Cluster[] cluster; - private static final Session[] session; + private static final Map<Integer, Cluster> clusters = new HashMap<>(); + private static final Map<Integer, Session> sessions = new HashMap<>(); private static boolean isServerPrepared = false; - public static int maxProtocolVersion; + public static final List<Integer> PROTOCOL_VERSIONS; static { - int version; - for (version = 1; version <= Server.CURRENT_VERSION; ) + // The latest versions might not be supported yet by the java driver + ImmutableList.Builder<Integer> builder = ImmutableList.builder(); + for (int version = Server.MIN_SUPPORTED_VERSION; version <= Server.CURRENT_VERSION; version++) { try { - ProtocolVersion.fromInt(++version); + ProtocolVersion.fromInt(version); + builder.add(version); } catch (IllegalArgumentException e) { - version--; break; } } - maxProtocolVersion = version; - cluster = new Cluster[maxProtocolVersion]; - session = new Session[maxProtocolVersion]; + PROTOCOL_VERSIONS = builder.build(); // Once per-JVM is enough prepareServer(true); @@ -227,11 +228,9 @@ public abstract class CQLTester @AfterClass public static void tearDownClass() { - for (Session sess : session) - if (sess != null) + for (Session sess : sessions.values()) sess.close(); - for (Cluster cl : cluster) - if (cl != null) + for (Cluster cl : clusters.values()) cl.close(); if (server != null) @@ -319,17 +318,19 @@ public abstract class CQLTester server = new Server.Builder().withHost(nativeAddr).withPort(nativePort).build(); server.start(); - for (int version = 1; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { - if (cluster[version-1] != null) + if (clusters.containsKey(version)) continue; - cluster[version-1] = Cluster.builder().addContactPoints(nativeAddr) - .withClusterName("Test Cluster") - .withPort(nativePort) - .withProtocolVersion(ProtocolVersion.fromInt(version)) - .build(); - session[version-1] = cluster[version-1].connect(); + Cluster cluster = Cluster.builder() + .addContactPoints(nativeAddr) + .withClusterName("Test Cluster") + .withPort(nativePort) + .withProtocolVersion(ProtocolVersion.fromInt(version)) + .build(); + clusters.put(version, cluster); + sessions.put(version, cluster.connect()); logger.info("Started Java Driver instance for protocol version {}", version); } @@ -623,16 +624,19 @@ public abstract class CQLTester protected com.datastax.driver.core.ResultSet executeNet(int protocolVersion, String query, Object... values) throws Throwable { - requireNetwork(); + return sessionNet(protocolVersion).execute(formatQuery(query), values); + } - return session[protocolVersion-1].execute(formatQuery(query), values); + protected Session sessionNet() + { + return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1)); } protected Session sessionNet(int protocolVersion) { requireNetwork(); - return session[protocolVersion-1]; + return sessions.get(protocolVersion); } private String formatQuery(String query) @@ -696,9 +700,9 @@ public abstract class CQLTester for (int j = 0; j < meta.size(); j++) { DataType type = meta.getType(j); - com.datastax.driver.core.TypeCodec<Object> codec = cluster[protocolVersion -1].getConfiguration() - .getCodecRegistry() - .codecFor(type); + com.datastax.driver.core.TypeCodec<Object> codec = clusters.get(protocolVersion).getConfiguration() + .getCodecRegistry() + .codecFor(type); ByteBuffer expectedByteValue = codec.serialize(expected[j], ProtocolVersion.fromInt(protocolVersion)); int expectedBytes = expectedByteValue.remaining(); ByteBuffer actualValue = actual.getBytesUnsafe(meta.getName(j)); @@ -1237,7 +1241,7 @@ public abstract class CQLTester protected com.datastax.driver.core.TupleType tupleTypeOf(int protocolVersion, DataType...types) { requireNetwork(); - return cluster[protocolVersion -1].getMetadata().newTupleType(types); + return clusters.get(protocolVersion).getMetadata().newTupleType(types); } // Attempt to find an AbstracType from a value (for serialization/printing sake). http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java index 0d590b2..a947593 100644 --- a/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java +++ b/test/unit/org/apache/cassandra/cql3/IndexQueryPagingTest.java @@ -3,7 +3,6 @@ package org.apache.cassandra.cql3; import org.junit.Test; import com.datastax.driver.core.Session; -import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.Statement; import static org.junit.Assert.assertEquals; @@ -77,7 +76,7 @@ public class IndexQueryPagingTest extends CQLTester // setting the fetch size < than the row count. Assert // that all rows are returned, so we know that paging // of the results was involved. - Session session = sessionNet(maxProtocolVersion); + Session session = sessionNet(); Statement stmt = session.newSimpleStatement(String.format(cql, KEYSPACE + "." + currentTable())); stmt.setFetchSize(rowCount - 1); assertEquals(rowCount, session.execute(stmt).all().size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java index f43e335..ef06dbf 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFPureScriptTest.java @@ -89,7 +89,7 @@ public class UFPureScriptTest extends CQLTester assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"), row(list, set, map)); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) assertRowsNet(version, executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"), row(list, set, map)); @@ -193,7 +193,7 @@ public class UFPureScriptTest extends CQLTester DataType.map(DataType.cint(), DataType.cboolean())); TupleValue tup = tType.newValue(1d, list, set, map); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { assertRowsNet(version, executeNet(version, "SELECT " + fTup1 + "(tup) FROM %s WHERE key = 1"), @@ -303,7 +303,7 @@ public class UFPureScriptTest extends CQLTester row("three", "one", "two")); // same test - but via native protocol - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) assertRowsNet(version, executeNet(version, cqlSelect), row("three", "one", "two")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java index 6bd03ad..ce50767 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/UFTest.java @@ -29,7 +29,6 @@ import java.util.TreeSet; import java.util.UUID; import java.security.AccessControlException; -import com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Test; @@ -932,7 +931,7 @@ public class UFTest extends CQLTester row(list, set, map)); // same test - but via native protocol - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) assertRowsNet(version, executeNet(version, "SELECT " + fList + "(lst), " + fSet + "(st), " + fMap + "(mp) FROM %s WHERE key = 1"), row(list, set, map)); @@ -1041,7 +1040,7 @@ public class UFTest extends CQLTester Assert.assertNull(row.getBytes("t")); Assert.assertNull(row.getBytes("u")); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { Row r = executeNet(version, "SELECT " + fList + "(lst) as l, " + @@ -1168,7 +1167,7 @@ public class UFTest extends CQLTester DataType.set(DataType.text()), DataType.map(DataType.cint(), DataType.cboolean())); TupleValue tup = tType.newValue(1d, list, set, map); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { assertRowsNet(version, executeNet(version, "SELECT " + fTup0 + "(tup) FROM %s WHERE key = 1"), @@ -1195,7 +1194,7 @@ public class UFTest extends CQLTester createTable("CREATE TABLE %s (key int primary key, udt frozen<" + KEYSPACE + '.' + type + ">)"); execute("INSERT INTO %s (key, udt) VALUES (1, {txt: 'one', i:1})"); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { executeNet(version, "USE " + KEYSPACE); @@ -1259,7 +1258,7 @@ public class UFTest extends CQLTester assertRows(execute("SELECT " + fUdt2 + "(udt) FROM %s WHERE key = 1"), row(1)); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { List<Row> rowsNet = executeNet(version, "SELECT " + fUdt0 + "(udt) FROM %s WHERE key = 1").all(); Assert.assertEquals(1, rowsNet.size()); @@ -1521,7 +1520,7 @@ public class UFTest extends CQLTester assertRows(execute("SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"), row("three", "one", "two")); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) assertRowsNet(version, executeNet(version, "SELECT " + fName1 + "(lst), " + fName2 + "(st), " + fName3 + "(mp) FROM %s WHERE key = 1"), row("three", "one", "two")); @@ -1777,7 +1776,7 @@ public class UFTest extends CQLTester "LANGUAGE JAVA\n" + "AS 'throw new RuntimeException();'"); - for (int version = Server.VERSION_2; version <= maxProtocolVersion; version++) + for (int version : PROTOCOL_VERSIONS) { try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/service/ClientWarningsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java index 7716b4c..5cdeb78 100644 --- a/test/unit/org/apache/cassandra/service/ClientWarningsTest.java +++ b/test/unit/org/apache/cassandra/service/ClientWarningsTest.java @@ -18,20 +18,19 @@ package org.apache.cassandra.service; import org.apache.commons.lang3.StringUtils; + import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.SimpleClient; import org.apache.cassandra.transport.messages.QueryMessage; import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNull; public class ClientWarningsTest extends CQLTester { @@ -77,21 +76,6 @@ public class ClientWarningsTest extends CQLTester } } - @Test - public void testLargeBatchWithProtoV2() throws Exception - { - createTable("CREATE TABLE %s (pk int PRIMARY KEY, v text)"); - - try (SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(), nativePort, Server.VERSION_2)) - { - client.connect(false); - - QueryMessage query = new QueryMessage(createBatchStatement(DatabaseDescriptor.getBatchSizeWarnThreshold()), QueryOptions.DEFAULT); - Message.Response resp = client.execute(query); - assertNull(resp.getWarnings()); - } - } - private String createBatchStatement(int minSize) { return String.format("BEGIN UNLOGGED BATCH INSERT INTO %s.%s (pk, v) VALUES (1, '%s') APPLY BATCH;", http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java index 9910167..f47f355 100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java @@ -20,25 +20,36 @@ package org.apache.cassandra.transport; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.cassandra.transport.messages.ErrorMessage; + import org.junit.Assert; import org.junit.Test; import java.util.ArrayList; import java.util.List; +import static org.apache.cassandra.transport.Message.Direction.*; + public class ProtocolErrorTest { @Test public void testInvalidProtocolVersion() throws Exception { + // test using a protocol version higher than the current version + testInvalidProtocolVersion(Server.CURRENT_VERSION + 1); + // test using a protocol version lower than the lowest version + testInvalidProtocolVersion(Server.MIN_SUPPORTED_VERSION - 1); + + } + + public void testInvalidProtocolVersion(int version) throws Exception + { Frame.Decoder dec = new Frame.Decoder(null); List<Object> results = new ArrayList<>(); - // should generate a protocol exception for using a protocol version higher than the current version byte[] frame = new byte[] { - (byte) ((Server.CURRENT_VERSION + 1) & Frame.PROTOCOL_VERSION_MASK), // direction & version + (byte) REQUEST.addToVersion(version), // direction & version 0x00, // flags - 0x01, // stream ID + 0x00, 0x01, // stream ID 0x09, // opcode 0x00, 0x00, 0x00, 0x21, // body length 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45, @@ -52,6 +63,7 @@ public class ProtocolErrorTest { dec.decode(null, buf, results); Assert.fail("Expected protocol error"); } catch (ProtocolException e) { + Assert.assertTrue(e.getMessage().contains("Invalid or unsupported protocol version")); } } @@ -64,9 +76,9 @@ public class ProtocolErrorTest { // should generate a protocol exception for using a response frame with // a prepare op, ensure that it comes back with stream ID 1 byte[] frame = new byte[] { - (byte) 0x82, // direction & version + (byte) RESPONSE.addToVersion(Server.CURRENT_VERSION), // direction & version 0x00, // flags - 0x01, // stream ID + 0x00, 0x01, // stream ID 0x09, // opcode 0x00, 0x00, 0x00, 0x21, // body length 0x00, 0x00, 0x00, 0x1b, 0x00, 0x1b, 0x53, 0x45, @@ -82,29 +94,7 @@ public class ProtocolErrorTest { } catch (ErrorMessage.WrappedException e) { // make sure the exception has the correct stream ID Assert.assertEquals(1, e.getStreamId()); - } - } - - @Test - public void testNegativeBodyLength() throws Exception - { - Frame.Decoder dec = new Frame.Decoder(null); - - List<Object> results = new ArrayList<>(); - byte[] frame = new byte[] { - (byte) 0x82, // direction & version - 0x00, // flags - 0x01, // stream ID - 0x09, // opcode - (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length (-1) - }; - ByteBuf buf = Unpooled.wrappedBuffer(frame); - try { - dec.decode(null, buf, results); - Assert.fail("Expected protocol error"); - } catch (ErrorMessage.WrappedException e) { - // make sure the exception has the correct stream ID - Assert.assertEquals(1, e.getStreamId()); + Assert.assertTrue(e.getMessage().contains("Wrong protocol direction")); } } @@ -115,19 +105,21 @@ public class ProtocolErrorTest { List<Object> results = new ArrayList<>(); byte[] frame = new byte[] { - (byte) 0x82, // direction & version + (byte) REQUEST.addToVersion(Server.CURRENT_VERSION), // direction & version 0x00, // flags - 0x01, // stream ID + 0x00, 0x01, // stream ID 0x09, // opcode - 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff, // body length + 0x10, (byte) 0x00, (byte) 0x00, (byte) 0x00, // body length }; - ByteBuf buf = Unpooled.wrappedBuffer(frame); + byte[] body = new byte[0x10000000]; + ByteBuf buf = Unpooled.wrappedBuffer(frame, body); try { dec.decode(null, buf, results); Assert.fail("Expected protocol error"); } catch (ErrorMessage.WrappedException e) { // make sure the exception has the correct stream ID Assert.assertEquals(1, e.getStreamId()); + Assert.assertTrue(e.getMessage().contains("Request is too big")); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8439e74e/test/unit/org/apache/cassandra/transport/SerDeserTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/transport/SerDeserTest.java b/test/unit/org/apache/cassandra/transport/SerDeserTest.java index 352327e..fdb346e 100644 --- a/test/unit/org/apache/cassandra/transport/SerDeserTest.java +++ b/test/unit/org/apache/cassandra/transport/SerDeserTest.java @@ -25,7 +25,6 @@ import io.netty.buffer.ByteBuf; import org.junit.Test; import org.apache.cassandra.cql3.*; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.transport.Event.TopologyChange; @@ -46,8 +45,8 @@ public class SerDeserTest @Test public void collectionSerDeserTest() throws Exception { - collectionSerDeserTest(2); collectionSerDeserTest(3); + collectionSerDeserTest(4); } public void collectionSerDeserTest(int version) throws Exception @@ -93,7 +92,6 @@ public class SerDeserTest @Test public void eventSerDeserTest() throws Exception { - eventSerDeserTest(2); eventSerDeserTest(3); eventSerDeserTest(4); } @@ -173,8 +171,8 @@ public class SerDeserTest @Test public void udtSerDeserTest() throws Exception { - udtSerDeserTest(2); udtSerDeserTest(3); + udtSerDeserTest(4); } public void udtSerDeserTest(int version) throws Exception @@ -200,10 +198,6 @@ public class SerDeserTest Term t = u.prepare("ks", columnSpec("myValue", udt)); QueryOptions options = QueryOptions.DEFAULT; - if (version == 2) - options = QueryOptions.fromProtocolV2(ConsistencyLevel.ONE, Collections.<ByteBuffer>emptyList()); - else if (version != 3) - throw new AssertionError("Invalid protocol version for test"); ByteBuffer serialized = t.bindAndGet(options);
