Repository: cassandra Updated Branches: refs/heads/trunk 639d4b240 -> a59be2693
Expand use of vints patch by Sylvain Lebresne; reviewed by Aleksey Yeschenko for CASSANDRA-9801 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a59be269 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a59be269 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a59be269 Branch: refs/heads/trunk Commit: a59be2693e33e899264bc29b37b669d4bb645a74 Parents: 639d4b2 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Jul 14 14:23:28 2015 +0200 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Tue Jul 28 17:17:52 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 4 +- src/java/org/apache/cassandra/db/Columns.java | 16 ++++---- src/java/org/apache/cassandra/db/Mutation.java | 28 +++++++++++--- .../org/apache/cassandra/db/ReadResponse.java | 15 +++----- .../cassandra/db/SerializationHeader.java | 24 ++++++------ src/java/org/apache/cassandra/db/Slices.java | 6 +-- .../db/filter/ClusteringIndexNamesFilter.java | 6 +-- .../cassandra/db/filter/ColumnFilter.java | 6 +-- .../apache/cassandra/db/filter/DataLimits.java | 40 ++++++++++---------- .../apache/cassandra/db/filter/RowFilter.java | 12 +++--- .../rows/UnfilteredRowIteratorSerializer.java | 6 +-- .../cassandra/db/rows/UnfilteredSerializer.java | 16 ++++---- 12 files changed, 97 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2db4115..4608492 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,8 +5,8 @@ * Metrics should use up to date nomenclature (CASSANDRA-9448) * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384) * Cleanup crc and adler code for java 8 (CASSANDRA-9650) - * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825, 9848, - 9705, 9859, 9867, 9874, 9828, 9801) + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825, + 9848, 9705, 9859, 9867, 9874, 9828, 9801) * Update Guava to 18.0 (CASSANDRA-9653) * Bloom filter false positive ratio is not honoured (CASSANDRA-8413) * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 48a4504..03d2e14 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db; -import java.io.DataInput; import java.io.IOException; import java.util.*; import java.util.function.Predicate; @@ -31,6 +30,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -526,26 +526,26 @@ public class Columns implements Iterable<ColumnDefinition> { public void serialize(Columns columns, DataOutputPlus out) throws IOException { - out.writeShort(columns.columnCount()); + out.writeVInt(columns.columnCount()); for (ColumnDefinition column : columns) - ByteBufferUtil.writeWithShortLength(column.name.bytes, out); + ByteBufferUtil.writeWithVIntLength(column.name.bytes, out); } public long serializedSize(Columns columns) { - long size = TypeSizes.sizeof((short)columns.columnCount()); + long size = TypeSizes.sizeofVInt(columns.columnCount()); for (ColumnDefinition column : columns) - size += TypeSizes.sizeofWithShortLength(column.name.bytes); + size += ByteBufferUtil.serializedSizeWithVIntLength(column.name.bytes); return size; } - public Columns deserialize(DataInput in, CFMetaData metadata) throws IOException + public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOException { - int length = in.readUnsignedShort(); + int length = (int)in.readVInt(); ColumnDefinition[] columns = new ColumnDefinition[length]; for (int i = 0; i < length; i++) { - ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + ByteBuffer name = ByteBufferUtil.readWithVIntLength(in); ColumnDefinition column = metadata.getColumnDefinition(name); if (column == null) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/Mutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index aca6622..3d49ca6 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -248,12 +248,19 @@ public class Mutation implements IMutation if (version < MessagingService.VERSION_20) out.writeUTF(mutation.getKeyspaceName()); + /* serialize the modifications in the mutation */ + int size = mutation.modifications.size(); + if (version < MessagingService.VERSION_30) + { ByteBufferUtil.writeWithShortLength(mutation.key().getKey(), out); + out.writeInt(size); + } + else + { + out.writeVInt(size); + } - /* serialize the modifications in the mutation */ - int size = mutation.modifications.size(); - out.writeInt(size); assert size > 0; for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet()) PartitionUpdate.serializer.serialize(entry.getValue(), out, version); @@ -266,10 +273,17 @@ public class Mutation implements IMutation keyspaceName = in.readUTF(); DecoratedKey key = null; + int size; if (version < MessagingService.VERSION_30) + { key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + size = in.readInt(); + } + else + { + size = (int)in.readVInt(); + } - int size = in.readInt(); assert size > 0; if (size == 1) @@ -307,9 +321,13 @@ public class Mutation implements IMutation { int keySize = mutation.key().getKey().remaining(); size += TypeSizes.sizeof((short) keySize) + keySize; + size += TypeSizes.sizeof(mutation.modifications.size()); + } + else + { + size += TypeSizes.sizeofVInt(mutation.modifications.size()); } - size += TypeSizes.sizeof(mutation.modifications.size()); for (Map.Entry<UUID, PartitionUpdate> entry : mutation.modifications.entrySet()) size += PartitionUpdate.serializer.serializedSize(entry.getValue(), version); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 740423a..90bd21d 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -20,8 +20,6 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.ArrayList; -import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.rows.*; @@ -165,14 +163,14 @@ public abstract class ReadResponse } boolean isDigest = response.isDigestQuery(); - ByteBufferUtil.writeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out); + ByteBufferUtil.writeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out); if (!isDigest) { // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the // version, we'll have to deserialize/re-serialize the data to be in the proper version. assert version == MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; - ByteBufferUtil.writeWithLength(data, out); + ByteBufferUtil.writeWithVIntLength(data, out); } } @@ -184,12 +182,12 @@ public abstract class ReadResponse throw new UnsupportedOperationException(); } - ByteBuffer digest = ByteBufferUtil.readWithShortLength(in); + ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in); if (digest.hasRemaining()) return new DigestResponse(digest); assert version == MessagingService.VERSION_30; - ByteBuffer data = ByteBufferUtil.readWithLength(in); + ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); return new DataResponse(data); } @@ -202,15 +200,14 @@ public abstract class ReadResponse } boolean isDigest = response.isDigestQuery(); - long size = ByteBufferUtil.serializedSizeWithShortLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER); - + long size = ByteBufferUtil.serializedSizeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER); if (!isDigest) { // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the // version, we'll have to deserialize/re-serialize the data to be in the proper version. assert version == MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; - size += ByteBufferUtil.serializedSizeWithLength(data); + size += ByteBufferUtil.serializedSizeWithVIntLength(data); } return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index c054b25..2326f1e 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -410,7 +410,7 @@ public class SerializationHeader EncodingStats.serializer.serialize(header.stats, out); writeType(header.keyType, out); - out.writeShort(header.clusteringTypes.size()); + out.writeVInt(header.clusteringTypes.size()); for (AbstractType<?> type : header.clusteringTypes) writeType(type, out); @@ -424,7 +424,7 @@ public class SerializationHeader EncodingStats stats = EncodingStats.serializer.deserialize(in); AbstractType<?> keyType = readType(in); - int size = in.readUnsignedShort(); + int size = (int)in.readVInt(); List<AbstractType<?>> clusteringTypes = new ArrayList<>(size); for (int i = 0; i < size; i++) clusteringTypes.add(readType(in)); @@ -444,7 +444,7 @@ public class SerializationHeader int size = EncodingStats.serializer.serializedSize(header.stats); size += sizeofType(header.keyType); - size += TypeSizes.sizeof((short)header.clusteringTypes.size()); + size += TypeSizes.sizeofVInt(header.clusteringTypes.size()); for (AbstractType<?> type : header.clusteringTypes) size += sizeofType(type); @@ -455,20 +455,20 @@ public class SerializationHeader private void writeColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns, DataOutputPlus out) throws IOException { - out.writeShort(columns.size()); + out.writeVInt(columns.size()); for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet()) { - ByteBufferUtil.writeWithShortLength(entry.getKey(), out); + ByteBufferUtil.writeWithVIntLength(entry.getKey(), out); writeType(entry.getValue(), out); } } private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> columns) { - long size = TypeSizes.sizeof((short)columns.size()); + long size = TypeSizes.sizeofVInt(columns.size()); for (Map.Entry<ByteBuffer, AbstractType<?>> entry : columns.entrySet()) { - size += TypeSizes.sizeofWithShortLength(entry.getKey()); + size += ByteBufferUtil.serializedSizeWithVIntLength(entry.getKey()); size += sizeofType(entry.getValue()); } return size; @@ -476,10 +476,10 @@ public class SerializationHeader private void readColumnsWithType(DataInputPlus in, Map<ByteBuffer, AbstractType<?>> typeMap) throws IOException { - int length = in.readUnsignedShort(); + int length = (int)in.readVInt(); for (int i = 0; i < length; i++) { - ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + ByteBuffer name = ByteBufferUtil.readWithVIntLength(in); typeMap.put(name, readType(in)); } } @@ -487,18 +487,18 @@ public class SerializationHeader private void writeType(AbstractType<?> type, DataOutputPlus out) throws IOException { // TODO: we should have a terser serializaion format. Not a big deal though - ByteBufferUtil.writeWithLength(UTF8Type.instance.decompose(type.toString()), out); + ByteBufferUtil.writeWithVIntLength(UTF8Type.instance.decompose(type.toString()), out); } private AbstractType<?> readType(DataInputPlus in) throws IOException { - ByteBuffer raw = ByteBufferUtil.readWithLength(in); + ByteBuffer raw = ByteBufferUtil.readWithVIntLength(in); return TypeParser.parse(UTF8Type.instance.compose(raw)); } private int sizeofType(AbstractType<?> type) { - return TypeSizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString())); + return ByteBufferUtil.serializedSizeWithVIntLength(UTF8Type.instance.decompose(type.toString())); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/Slices.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slices.java b/src/java/org/apache/cassandra/db/Slices.java index 32ca06d..9dd4a48 100644 --- a/src/java/org/apache/cassandra/db/Slices.java +++ b/src/java/org/apache/cassandra/db/Slices.java @@ -288,7 +288,7 @@ public abstract class Slices implements Iterable<Slice> public void serialize(Slices slices, DataOutputPlus out, int version) throws IOException { int size = slices.size(); - out.writeInt(size); + out.writeVInt(size); if (size == 0) return; @@ -303,7 +303,7 @@ public abstract class Slices implements Iterable<Slice> public long serializedSize(Slices slices, int version) { - long size = TypeSizes.sizeof(slices.size()); + long size = TypeSizes.sizeofVInt(slices.size()); if (slices.size() == 0) return size; @@ -320,7 +320,7 @@ public abstract class Slices implements Iterable<Slice> public Slices deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { - int size = in.readInt(); + int size = (int)in.readVInt(); if (size == 0) return NONE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java index 13329f3..a6f2179 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java @@ -245,15 +245,15 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter protected void serializeInternal(DataOutputPlus out, int version) throws IOException { ClusteringComparator comparator = (ClusteringComparator)clusterings.comparator(); - out.writeInt(clusterings.size()); + out.writeVInt(clusterings.size()); for (Clustering clustering : clusterings) Clustering.serializer.serialize(clustering, out, version, comparator.subtypes()); } protected long serializedSizeInternal(int version) { - long size = 0; ClusteringComparator comparator = (ClusteringComparator)clusterings.comparator(); + long size = TypeSizes.sizeofVInt(clusterings.size()); for (Clustering clustering : clusterings) size += Clustering.serializer.serializedSize(clustering, version, comparator.subtypes()); return size; @@ -265,7 +265,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter { ClusteringComparator comparator = metadata.comparator; BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(comparator); - int size = in.readInt(); + int size = (int)in.readVInt(); for (int i = 0; i < size; i++) clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index 084bad6..d2cb87d 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -365,7 +365,7 @@ public class ColumnFilter if (selection.subSelections != null) { - out.writeShort(selection.subSelections.size()); + out.writeVInt(selection.subSelections.size()); for (ColumnSubselection subSel : selection.subSelections.values()) ColumnSubselection.serializer.serialize(subSel, out, version); } @@ -390,7 +390,7 @@ public class ColumnFilter if (hasSubSelections) { subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder()); - int size = in.readUnsignedShort(); + int size = (int)in.readVInt(); for (int i = 0; i < size; i++) { ColumnSubselection subSel = ColumnSubselection.serializer.deserialize(in, version, metadata); @@ -414,7 +414,7 @@ public class ColumnFilter if (selection.subSelections != null) { - size += TypeSizes.sizeof((short)selection.subSelections.size()); + size += TypeSizes.sizeofVInt(selection.subSelections.size()); for (ColumnSubselection subSel : selection.subSelections.values()) size += ColumnSubselection.serializer.serializedSize(subSel, version); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 206afa4..458ee30 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -17,13 +17,13 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -644,45 +644,45 @@ public abstract class DataLimits case CQL_LIMIT: case CQL_PAGING_LIMIT: CQLLimits cqlLimits = (CQLLimits)limits; - out.writeInt(cqlLimits.rowLimit); - out.writeInt(cqlLimits.perPartitionLimit); + out.writeVInt(cqlLimits.rowLimit); + out.writeVInt(cqlLimits.perPartitionLimit); out.writeBoolean(cqlLimits.isDistinct); if (limits.kind() == Kind.CQL_PAGING_LIMIT) { CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; - ByteBufferUtil.writeWithShortLength(pagingLimits.lastReturnedKey, out); - out.writeInt(pagingLimits.lastReturnedKeyRemaining); + ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out); + out.writeVInt(pagingLimits.lastReturnedKeyRemaining); } break; case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: ThriftLimits thriftLimits = (ThriftLimits)limits; - out.writeInt(thriftLimits.partitionLimit); - out.writeInt(thriftLimits.cellPerPartitionLimit); + out.writeVInt(thriftLimits.partitionLimit); + out.writeVInt(thriftLimits.cellPerPartitionLimit); break; } } - public DataLimits deserialize(DataInput in, int version) throws IOException + public DataLimits deserialize(DataInputPlus in, int version) throws IOException { Kind kind = Kind.values()[in.readUnsignedByte()]; switch (kind) { case CQL_LIMIT: case CQL_PAGING_LIMIT: - int rowLimit = in.readInt(); - int perPartitionLimit = in.readInt(); + int rowLimit = (int)in.readVInt(); + int perPartitionLimit = (int)in.readVInt(); boolean isDistinct = in.readBoolean(); if (kind == Kind.CQL_LIMIT) return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); - ByteBuffer lastKey = ByteBufferUtil.readWithShortLength(in); - int lastRemaining = in.readInt(); + ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in); + int lastRemaining = (int)in.readVInt(); return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining); case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: - int partitionLimit = in.readInt(); - int cellPerPartitionLimit = in.readInt(); + int partitionLimit = (int)in.readVInt(); + int cellPerPartitionLimit = (int)in.readVInt(); return kind == Kind.THRIFT_LIMIT ? new ThriftLimits(partitionLimit, cellPerPartitionLimit) : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); @@ -698,21 +698,21 @@ public abstract class DataLimits case CQL_LIMIT: case CQL_PAGING_LIMIT: CQLLimits cqlLimits = (CQLLimits)limits; - size += TypeSizes.sizeof(cqlLimits.rowLimit); - size += TypeSizes.sizeof(cqlLimits.perPartitionLimit); + size += TypeSizes.sizeofVInt(cqlLimits.rowLimit); + size += TypeSizes.sizeofVInt(cqlLimits.perPartitionLimit); size += TypeSizes.sizeof(cqlLimits.isDistinct); if (limits.kind() == Kind.CQL_PAGING_LIMIT) { CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; - size += ByteBufferUtil.serializedSizeWithShortLength(pagingLimits.lastReturnedKey); - size += TypeSizes.sizeof(pagingLimits.lastReturnedKeyRemaining); + size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey); + size += TypeSizes.sizeofVInt(pagingLimits.lastReturnedKeyRemaining); } break; case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: ThriftLimits thriftLimits = (ThriftLimits)limits; - size += TypeSizes.sizeof(thriftLimits.partitionLimit); - size += TypeSizes.sizeof(thriftLimits.cellPerPartitionLimit); + size += TypeSizes.sizeofVInt(thriftLimits.partitionLimit); + size += TypeSizes.sizeofVInt(thriftLimits.cellPerPartitionLimit); break; default: throw new AssertionError(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 5a49bca..881e154 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; @@ -33,6 +32,7 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; @@ -392,7 +392,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> } } - public Expression deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public Expression deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { ByteBuffer name = ByteBufferUtil.readWithShortLength(in); Operator operator = Operator.readFrom(in); @@ -742,15 +742,15 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException { out.writeBoolean(filter instanceof ThriftFilter); - out.writeShort(filter.expressions.size()); + out.writeVInt(filter.expressions.size()); for (Expression expr : filter.expressions) Expression.serializer.serialize(expr, out, version); } - public RowFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public RowFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { boolean forThrift = in.readBoolean(); - int size = in.readUnsignedShort(); + int size = (int)in.readVInt(); List<Expression> expressions = new ArrayList<>(size); for (int i = 0; i < size; i++) expressions.add(Expression.serializer.deserialize(in, version, metadata)); @@ -762,7 +762,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> public long serializedSize(RowFilter filter, int version) { long size = 1 // forThrift - + TypeSizes.sizeof((short)filter.expressions.size()); + + TypeSizes.sizeofVInt(filter.expressions.size()); for (Expression expr : filter.expressions) size += Expression.serializer.serializedSize(expr, version); return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 3a12584..b96e0b1 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -89,7 +89,7 @@ public class UnfilteredRowIteratorSerializer // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException { - ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out); + ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); int flags = 0; if (iterator.isReverseOrder()) @@ -140,7 +140,7 @@ public class UnfilteredRowIteratorSerializer assert rowEstimate >= 0; - long size = TypeSizes.sizeofWithLength(iterator.partitionKey().getKey()) + long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey()) + 1; // flags if (iterator.isEmpty()) @@ -170,7 +170,7 @@ public class UnfilteredRowIteratorSerializer public Header deserializeHeader(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag) throws IOException { - DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in)); + DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); boolean isReversed = (flags & IS_REVERSED) != 0; if ((flags & IS_EMPTY) != 0) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a59be269/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index f6eb62a..f306e6d 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -146,7 +146,7 @@ public class UnfilteredSerializer writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse); if (useSparse) - out.writeShort(-1); + out.writeVInt(-1); } private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse) @@ -157,7 +157,7 @@ public class UnfilteredSerializer if (cell == null) return; - out.writeShort(idx); + out.writeVInt(idx); } Cell.serializer.serialize(cell, out, rowLiveness, header); } @@ -170,7 +170,7 @@ public class UnfilteredSerializer if (data == null) return; - out.writeShort(idx); + out.writeVInt(idx); } if (hasComplexDeletion) @@ -244,7 +244,7 @@ public class UnfilteredSerializer size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse); if (useSparse) - size += TypeSizes.sizeof((short)-1); + size += TypeSizes.sizeofVInt(-1); return size; } @@ -257,7 +257,7 @@ public class UnfilteredSerializer if (cell == null) return size; - size += TypeSizes.sizeof((short)idx); + size += TypeSizes.sizeofVInt(idx); } return size + Cell.serializer.serializedSize(cell, rowLiveness, header); } @@ -270,7 +270,7 @@ public class UnfilteredSerializer if (data == null) return size; - size += TypeSizes.sizeof((short)idx); + size += TypeSizes.sizeofVInt(idx); } if (hasComplexDeletion) @@ -388,7 +388,7 @@ public class UnfilteredSerializer int count = columns.columnCount(); int simpleCount = columns.simpleColumnCount(); int i; - while ((i = in.readShort()) >= 0) + while ((i = (int)in.readVInt()) >= 0) { if (i > count) throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); @@ -489,7 +489,7 @@ public class UnfilteredSerializer int count = columns.columnCount(); int simpleCount = columns.simpleColumnCount(); int i; - while ((i = in.readShort()) >= 0) + while ((i = (int)in.readVInt()) >= 0) { if (i > count) throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count));