http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java deleted file mode 100644 index 55826f5..0000000 --- a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import org.apache.cassandra.io.IVersionedSerializer; - -public class RangeSliceVerbHandler extends ReadCommandVerbHandler -{ - @Override - protected IVersionedSerializer<ReadResponse> serializer() - { - return ReadResponse.rangeSliceSerializer; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index d8051fe..0bda184 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -37,7 +37,6 @@ import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.index.Index; import org.apache.cassandra.index.IndexNotAvailableException; -import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -64,43 +63,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery protected static final Logger logger = LoggerFactory.getLogger(ReadCommand.class); public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); - // For READ verb: will either dispatch on 'serializer' for 3.0 or 'legacyReadCommandSerializer' for earlier version. - // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. - public static final IVersionedSerializer<ReadCommand> readSerializer = new ForwardingVersionedSerializer<ReadCommand>() - { - protected IVersionedSerializer<ReadCommand> delegate(int version) - { - return version < MessagingService.VERSION_30 - ? legacyReadCommandSerializer : serializer; - } - }; - - // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 'legacyRangeSliceCommandSerializer' for earlier version. - // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. - public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadCommand>() - { - protected IVersionedSerializer<ReadCommand> delegate(int version) - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceCommandSerializer : serializer; - } - }; - - // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 'legacyPagedRangeCommandSerializer' for earlier version. - // Can be removed (and replaced by 'serializer') once we drop pre-3.0 backward compatibility. - public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer = new ForwardingVersionedSerializer<ReadCommand>() - { - protected IVersionedSerializer<ReadCommand> delegate(int version) - { - return version < MessagingService.VERSION_30 - ? legacyPagedRangeCommandSerializer : serializer; - } - }; - - public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer(); - public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer(); - public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer(); - private final Kind kind; private final CFMetaData metadata; private final int nowInSec; @@ -580,7 +542,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery /** * Creates a message for this command. */ - public abstract MessageOut<ReadCommand> createMessage(int version); + public abstract MessageOut<ReadCommand> createMessage(); protected abstract void appendCQLWhereClause(StringBuilder sb); @@ -666,8 +628,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { - assert version >= MessagingService.VERSION_30; - out.writeByte(command.kind.ordinal()); out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent())); if (command.isDigestQuery()) @@ -685,8 +645,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery public ReadCommand deserialize(DataInputPlus in, int version) throws IOException { - assert version >= MessagingService.VERSION_30; - Kind kind = Kind.values()[in.readByte()]; int flags = in.readByte(); boolean isDigest = isDigest(flags); @@ -699,8 +657,8 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata.comparator); Optional<IndexMetadata> index = hasIndex - ? deserializeIndexMetadata(in, version, metadata) - : Optional.empty(); + ? deserializeIndexMetadata(in, version, metadata) + : Optional.empty(); return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, index); } @@ -724,8 +682,6 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery public long serializedSize(ReadCommand command, int version) { - assert version >= MessagingService.VERSION_30; - return 2 // kind + flags + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + CFMetaData.serializer.serializedSize(command.metadata(), version) @@ -737,1015 +693,4 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery + command.indexSerializedSize(version); } } - - private enum LegacyType - { - GET_BY_NAMES((byte)1), - GET_SLICES((byte)2); - - public final byte serializedValue; - - LegacyType(byte b) - { - this.serializedValue = b; - } - - public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) - { - return kind == ClusteringIndexFilter.Kind.SLICE - ? GET_SLICES - : GET_BY_NAMES; - } - - public static LegacyType fromSerializedValue(byte b) - { - return b == 1 ? GET_BY_NAMES : GET_SLICES; - } - } - - /** - * Serializer for pre-3.0 RangeSliceCommands. - */ - private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> - { - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; - assert !rangeCommand.dataRange().isPaging(); - - // convert pre-3.0 incompatible names filters to slice filters - rangeCommand = maybeConvertNamesToSlice(rangeCommand); - - CFMetaData metadata = rangeCommand.metadata(); - - out.writeUTF(metadata.ksName); - out.writeUTF(metadata.cfName); - out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis - - // begin DiskAtomFilterSerializer.serialize() - if (rangeCommand.isNamesQuery()) - { - out.writeByte(1); // 0 for slices, 1 for names - ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; - LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); - } - else - { - out.writeByte(0); // 0 for slices, 1 for names - - // slice filter serialization - ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; - - boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); - - out.writeBoolean(filter.isReversed()); - - // limit - DataLimits limits = rangeCommand.limits(); - if (limits.isDistinct()) - out.writeInt(1); - else - out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); - - int compositesToGroup; - boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT) - compositesToGroup = -1; - else if (limits.isDistinct() && !selectsStatics) - compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) - else - compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); - - out.writeInt(compositesToGroup); - } - - serializeRowFilter(out, rangeCommand.rowFilter()); - AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); - - // maxResults - out.writeInt(rangeCommand.limits().count()); - - // countCQL3Rows - if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT - out.writeBoolean(false); - else - out.writeBoolean(true); - - // isPaging - out.writeBoolean(false); - } - - public ReadCommand deserialize(DataInputPlus in, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - String keyspace = in.readUTF(); - String columnFamily = in.readUTF(); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); - if (metadata == null) - { - String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); - throw new UnknownColumnFamilyException(message, null); - } - - int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds - - ClusteringIndexFilter filter; - ColumnFilter selection; - int compositesToGroup = 0; - int perPartitionLimit = -1; - byte readType = in.readByte(); // 0 for slices, 1 for names - if (readType == 1) - { - Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); - selection = selectionAndFilter.left; - filter = selectionAndFilter.right; - } - else - { - Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); - filter = p.left; - perPartitionLimit = in.readInt(); - compositesToGroup = in.readInt(); - selection = getColumnSelectionForSlice(p.right, compositesToGroup, metadata); - } - - RowFilter rowFilter = deserializeRowFilter(in, metadata); - - AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); - int maxResults = in.readInt(); - - boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed) - in.readBoolean(); // isPaging (not needed) - - boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); - // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, - // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less - // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use - // that fact. - boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); - DataLimits limits; - if (isDistinct) - limits = DataLimits.distinctLimits(maxResults); - else if (compositesToGroup == -1) - limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); - else - limits = DataLimits.cqlLimits(maxResults); - - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), Optional.empty()); - } - - static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException - { - ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); - out.writeInt(indexExpressions.size()); - for (RowFilter.Expression expression : indexExpressions) - { - ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); - expression.operator().writeTo(out); - ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); - } - } - - static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException - { - int numRowFilters = in.readInt(); - if (numRowFilters == 0) - return RowFilter.NONE; - - RowFilter rowFilter = RowFilter.create(numRowFilters); - for (int i = 0; i < numRowFilters; i++) - { - ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); - ColumnDefinition column = metadata.getColumnDefinition(columnName); - Operator op = Operator.readFrom(in); - ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); - rowFilter.add(column, op, indexValue); - } - return rowFilter; - } - - static long serializedRowFilterSize(RowFilter rowFilter) - { - long size = TypeSizes.sizeof(0); // rowFilterCount - for (RowFilter.Expression expression : rowFilter) - { - size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); - size += TypeSizes.sizeof(0); // operator int value - size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); - } - return size; - } - - public long serializedSize(ReadCommand command, int version) - { - assert version < MessagingService.VERSION_30; - assert command.kind == Kind.PARTITION_RANGE; - - PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; - rangeCommand = maybeConvertNamesToSlice(rangeCommand); - CFMetaData metadata = rangeCommand.metadata(); - - long size = TypeSizes.sizeof(metadata.ksName); - size += TypeSizes.sizeof(metadata.cfName); - size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); - - size += 1; // single byte flag: 0 for slices, 1 for names - if (rangeCommand.isNamesQuery()) - { - PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); - ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; - size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); - } - else - { - ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; - boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); - size += TypeSizes.sizeof(filter.isReversed()); - size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); - size += TypeSizes.sizeof(0); // compositesToGroup - } - - if (rangeCommand.rowFilter().equals(RowFilter.NONE)) - { - size += TypeSizes.sizeof(0); - } - else - { - ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); - size += TypeSizes.sizeof(indexExpressions.size()); - for (RowFilter.Expression expression : indexExpressions) - { - size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); - size += TypeSizes.sizeof(expression.operator().ordinal()); - size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); - } - } - - size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); - size += TypeSizes.sizeof(rangeCommand.limits().count()); - size += TypeSizes.sizeof(!rangeCommand.isForThrift()); - return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); - } - - static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) - { - if (!command.dataRange().isNamesQuery()) - return command; - - CFMetaData metadata = command.metadata(); - if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) - return command; - - ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; - ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); - DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); - return new PartitionRangeReadCommand( - command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), - command.columnFilter(), command.rowFilter(), command.limits(), newRange, Optional.empty()); - } - - static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, int compositesToGroup, CFMetaData metadata) - { - // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. - // In that case, we'll basically be querying the first row of the partition, but we must make sure we include - // all columns so we get at least one cell if there is a live row as it would confuse pre-3.0 nodes otherwise. - if (compositesToGroup == -2) - return ColumnFilter.all(metadata); - - // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all - PartitionColumns columns = selectsStatics - ? metadata.partitionColumns() - : metadata.partitionColumns().withoutStatics(); - return ColumnFilter.selectionBuilder().addAll(columns).build(); - } - } - - /** - * Serializer for pre-3.0 PagedRangeCommands. - */ - private static class LegacyPagedRangeCommandSerializer implements IVersionedSerializer<ReadCommand> - { - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; - assert rangeCommand.dataRange().isPaging(); - - CFMetaData metadata = rangeCommand.metadata(); - - out.writeUTF(metadata.ksName); - out.writeUTF(metadata.cfName); - out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis - - AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); - - // pre-3.0 nodes don't accept names filters for paged range commands - ClusteringIndexSliceFilter filter; - if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES) - filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata); - else - filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; - - // slice filter - boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); - out.writeBoolean(filter.isReversed()); - - // slice filter's count - DataLimits.Kind kind = rangeCommand.limits().kind(); - boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; - if (isDistinct) - out.writeInt(1); - else - out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(), filter.requestedSlices())); - - // compositesToGroup - boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - int compositesToGroup; - if (kind == DataLimits.Kind.THRIFT_LIMIT) - compositesToGroup = -1; - else if (isDistinct && !selectsStatics) - compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) - else - compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); - - out.writeInt(compositesToGroup); - - // command-level "start" and "stop" composites. The start is the last-returned cell name if there is one, - // otherwise it's the same as the slice filter's start. The stop appears to always be the same as the - // slice filter's stop. - DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange(); - Clustering lastReturned = pagingRange.getLastReturned(); - ClusteringBound newStart = ClusteringBound.inclusiveStartOf(lastReturned); - Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1); - ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, newStart, true), out); - ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()), out); - - LegacyRangeSliceCommandSerializer.serializeRowFilter(out, rangeCommand.rowFilter()); - - // command-level limit - // Pre-3.0 we would always request one more row than we actually needed and the command-level "start" would - // be the last-returned cell name, so the response would always include it. - int maxResults = rangeCommand.limits().count() + 1; - out.writeInt(maxResults); - - // countCQL3Rows - if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // for Thrift or DISTINCT - out.writeBoolean(false); - else - out.writeBoolean(true); - } - - public ReadCommand deserialize(DataInputPlus in, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - String keyspace = in.readUTF(); - String columnFamily = in.readUTF(); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); - if (metadata == null) - { - String message = String.format("Got legacy paged range command for nonexistent table %s.%s.", keyspace, columnFamily); - throw new UnknownColumnFamilyException(message, null); - } - - int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds - AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); - - Pair<ClusteringIndexSliceFilter, Boolean> p = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); - ClusteringIndexSliceFilter filter = p.left; - boolean selectsStatics = p.right; - - int perPartitionLimit = in.readInt(); - int compositesToGroup = in.readInt(); - - // command-level Composite "start" and "stop" - LegacyLayout.LegacyBound startBound = LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true); - - ByteBufferUtil.readWithShortLength(in); // the composite "stop", which isn't actually needed - - ColumnFilter selection = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata); - - RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata); - int maxResults = in.readInt(); - boolean countCQL3Rows = in.readBoolean(); - - // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, - // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less - // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use - // that fact. - boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); - DataLimits limits; - if (isDistinct) - limits = DataLimits.distinctLimits(maxResults); - else - limits = DataLimits.cqlLimits(maxResults); - - limits = limits.forPaging(maxResults); - - // The pagedRangeCommand is used in pre-3.0 for both the first page and the following ones. On the first page, the startBound will be - // the start of the overall slice and will not be a proper Clustering. So detect that case and just return a non-paging DataRange, which - // is what 3.0 does. - DataRange dataRange = new DataRange(keyRange, filter); - Slices slices = filter.requestedSlices(); - if (!isDistinct && startBound != LegacyLayout.LegacyBound.BOTTOM && !startBound.bound.equals(slices.get(0).start())) - { - // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it - // missing without any problems, so we can safely always set "inclusive" to false in the data range - dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false); - } - return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange, Optional.empty()); - } - - public long serializedSize(ReadCommand command, int version) - { - assert version < MessagingService.VERSION_30; - assert command.kind == Kind.PARTITION_RANGE; - - PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; - CFMetaData metadata = rangeCommand.metadata(); - assert rangeCommand.dataRange().isPaging(); - - long size = TypeSizes.sizeof(metadata.ksName); - size += TypeSizes.sizeof(metadata.cfName); - size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); - - size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); - - // pre-3.0 nodes only accept slice filters for paged range commands - ClusteringIndexSliceFilter filter; - if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES) - filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata); - else - filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; - - // slice filter - boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); - size += TypeSizes.sizeof(filter.isReversed()); - - // slice filter's count - size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); - - // compositesToGroup - size += TypeSizes.sizeof(0); - - // command-level Composite "start" and "stop" - DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange(); - Clustering lastReturned = pagingRange.getLastReturned(); - Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1); - size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastReturned)); - size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering())); - - size += LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter()); - - // command-level limit - size += TypeSizes.sizeof(rangeCommand.limits().count()); - - // countCQL3Rows - return size + TypeSizes.sizeof(true); - } - } - - /** - * Serializer for pre-3.0 ReadCommands. - */ - static class LegacyReadCommandSerializer implements IVersionedSerializer<ReadCommand> - { - public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - assert command.kind == Kind.SINGLE_PARTITION; - - SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command; - singleReadCommand = maybeConvertNamesToSlice(singleReadCommand); - - CFMetaData metadata = singleReadCommand.metadata(); - - out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue); - - out.writeBoolean(singleReadCommand.isDigestQuery()); - out.writeUTF(metadata.ksName); - ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), out); - out.writeUTF(metadata.cfName); - out.writeLong(singleReadCommand.nowInSec() * 1000L); // convert from seconds to millis - - if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE) - serializeSliceCommand(singleReadCommand, out); - else - serializeNamesCommand(singleReadCommand, out); - } - - public ReadCommand deserialize(DataInputPlus in, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - LegacyType msgType = LegacyType.fromSerializedValue(in.readByte()); - - boolean isDigest = in.readBoolean(); - String keyspaceName = in.readUTF(); - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - String cfName = in.readUTF(); - long nowInMillis = in.readLong(); - int nowInSeconds = (int) (nowInMillis / 1000); // convert from millis to seconds - CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); - DecoratedKey dk = metadata.partitioner.decorateKey(key); - - switch (msgType) - { - case GET_BY_NAMES: - return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds, version); - case GET_SLICES: - return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds, version); - default: - throw new AssertionError(); - } - } - - public long serializedSize(ReadCommand command, int version) - { - assert version < MessagingService.VERSION_30; - assert command.kind == Kind.SINGLE_PARTITION; - SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command; - singleReadCommand = maybeConvertNamesToSlice(singleReadCommand); - - int keySize = singleReadCommand.partitionKey().getKey().remaining(); - - CFMetaData metadata = singleReadCommand.metadata(); - - long size = 1; // message type (single byte) - size += TypeSizes.sizeof(command.isDigestQuery()); - size += TypeSizes.sizeof(metadata.ksName); - size += TypeSizes.sizeof((short) keySize) + keySize; - size += TypeSizes.sizeof((long) command.nowInSec()); - - if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE) - return size + serializedSliceCommandSize(singleReadCommand); - else - return size + serializedNamesCommandSize(singleReadCommand); - } - - private void serializeNamesCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException - { - serializeNamesFilter(command, (ClusteringIndexNamesFilter)command.clusteringIndexFilter(), out); - } - - private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException - { - PartitionColumns columns = command.columnFilter().fetchedColumns(); - CFMetaData metadata = command.metadata(); - SortedSet<Clustering> requestedRows = filter.requestedRows(); - - if (requestedRows.isEmpty()) - { - // only static columns are requested - out.writeInt(columns.size()); - for (ColumnDefinition column : columns) - ByteBufferUtil.writeWithShortLength(column.name.bytes, out); - } - else - { - out.writeInt(requestedRows.size() * columns.size()); - for (Clustering clustering : requestedRows) - { - for (ColumnDefinition column : columns) - ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null), out); - } - } - - // countCql3Rows should be true if it's not for Thrift or a DISTINCT query - if (command.isForThrift() || (command.limits().kind() == DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1)) - out.writeBoolean(false); // it's compact and not a DISTINCT query - else - out.writeBoolean(true); - } - - static long serializedNamesFilterSize(ClusteringIndexNamesFilter filter, CFMetaData metadata, PartitionColumns fetchedColumns) - { - SortedSet<Clustering> requestedRows = filter.requestedRows(); - - long size = 0; - if (requestedRows.isEmpty()) - { - // only static columns are requested - size += TypeSizes.sizeof(fetchedColumns.size()); - for (ColumnDefinition column : fetchedColumns) - size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes); - } - else - { - size += TypeSizes.sizeof(requestedRows.size() * fetchedColumns.size()); - for (Clustering clustering : requestedRows) - { - for (ColumnDefinition column : fetchedColumns) - size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null)); - } - } - - return size + TypeSizes.sizeof(true); // countCql3Rows - } - - private SinglePartitionReadCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException - { - Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata); - - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand( - isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE, - key, selectionAndFilter.right); - } - - static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException - { - int numCellNames = in.readInt(); - - // The names filter could include either a) static columns or b) normal columns with the clustering columns - // fully specified. We need to handle those cases differently in 3.0. - NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator); - - ColumnFilter.Builder selectionBuilder = ColumnFilter.selectionBuilder(); - for (int i = 0; i < numCellNames; i++) - { - ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in); - LegacyLayout.LegacyCellName cellName; - try - { - cellName = LegacyLayout.decodeCellName(metadata, buffer); - } - catch (UnknownColumnException exc) - { - // TODO this probably needs a new exception class that shares a parent with UnknownColumnFamilyException - throw new UnknownColumnFamilyException( - "Received legacy range read command with names filter for unrecognized column name. " + - "Fill name in filter (hex): " + ByteBufferUtil.bytesToHex(buffer), metadata.cfId); - } - - // If we're querying for a static column, we may also need to read it - // as if it were a thrift dynamic column (because the column metadata, - // which makes it a static column in 3.0+, may have been added *after* - // some values were written). Note that all cql queries on non-compact - // tables used slice & not name filters prior to 3.0 so this path is - // not taken for non-compact tables. It is theoretically possible to - // get here via thrift, hence the check on metadata.isStaticCompactTable. - // See CASSANDRA-11087. - if (metadata.isStaticCompactTable() && cellName.clustering.equals(Clustering.STATIC_CLUSTERING)) - { - clusterings.add(Clustering.make(cellName.column.name.bytes)); - selectionBuilder.add(metadata.compactValueColumn()); - } - else - { - clusterings.add(cellName.clustering); - } - - selectionBuilder.add(cellName.column); - } - - // for compact storage tables without clustering keys, the column holding the selected value is named - // 'value' internally we add it to the selection here to prevent errors due to unexpected column names - // when serializing the initial local data response - if (metadata.isStaticCompactTable() && clusterings.isEmpty()) - selectionBuilder.addAll(metadata.partitionColumns()); - - in.readBoolean(); // countCql3Rows - - // clusterings cannot include STATIC_CLUSTERING, so if the names filter is for static columns, clusterings - // will be empty. However, by requesting the static columns in our ColumnFilter, this will still work. - ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false); - return Pair.create(selectionBuilder.build(), filter); - } - - private long serializedNamesCommandSize(SinglePartitionReadCommand command) - { - ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter(); - PartitionColumns columns = command.columnFilter().fetchedColumns(); - return serializedNamesFilterSize(filter, command.metadata(), columns); - } - - private void serializeSliceCommand(SinglePartitionReadCommand command, DataOutputPlus out) throws IOException - { - CFMetaData metadata = command.metadata(); - ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter(); - - Slices slices = filter.requestedSlices(); - boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING); - serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, metadata); - - out.writeBoolean(filter.isReversed()); - - boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING); - DataLimits limits = command.limits(); - if (limits.isDistinct()) - out.writeInt(1); // the limit is always 1 for DISTINCT queries - else - out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices())); - - int compositesToGroup; - if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense()) - compositesToGroup = -1; - else if (limits.isDistinct() && !selectsStatics) - compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) - else - compositesToGroup = metadata.clusteringColumns().size(); - - out.writeInt(compositesToGroup); - } - - private SinglePartitionReadCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException - { - Pair<ClusteringIndexSliceFilter, Boolean> p = deserializeSlicePartitionFilter(in, metadata); - ClusteringIndexSliceFilter filter = p.left; - boolean selectsStatics = p.right; - int count = in.readInt(); - int compositesToGroup = in.readInt(); - - // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all - ColumnFilter columnFilter = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata); - - // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, - // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter is probablematic - // however as we have no way to distinguish it from a normal select with a limit of 1 (and this, contrarily to the range query case - // were the countCQL3Rows boolean allows us to decide). - // So we consider this case not distinct here. This is ok because even if it is a distinct (with static), the count will be 1 and - // we'll still just query one row (a distinct DataLimits currently behave exactly like a CQL limit with a count of 1). The only - // drawback is that we'll send back the first row entirely while a 2.1/2.2 node would return only the first cell in that same - // situation. This isn't a problem for 2.1/2.2 code however (it would be for a range query, as it would throw off the count for - // reasons similar to CASSANDRA-10762, but it's ok for single partition queries). - // We do _not_ want to do the reverse however and consider a 'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make - // us only return the 1st cell rather then 1st row. - DataLimits limits; - if (compositesToGroup == -2) - limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value - else if (compositesToGroup == -1) - limits = DataLimits.thriftLimits(1, count); - else - limits = DataLimits.cqlLimits(count); - - // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionReadCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter); - } - - private long serializedSliceCommandSize(SinglePartitionReadCommand command) - { - CFMetaData metadata = command.metadata(); - ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter)command.clusteringIndexFilter(); - - Slices slices = filter.requestedSlices(); - boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING); - - long size = serializedSlicesSize(slices, makeStaticSlice, metadata); - size += TypeSizes.sizeof(command.clusteringIndexFilter().isReversed()); - size += TypeSizes.sizeof(command.limits().count()); - return size + TypeSizes.sizeof(0); // compositesToGroup - } - - static void serializeSlices(DataOutputPlus out, Slices slices, boolean isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException - { - out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0)); - - // In 3.0 we always store the slices in normal comparator order. Pre-3.0 nodes expect the slices to - // be in reversed order if the query is reversed, so we handle that here. - if (isReversed) - { - for (int i = slices.size() - 1; i >= 0; i--) - serializeSlice(out, slices.get(i), true, metadata); - if (makeStaticSlice) - serializeStaticSlice(out, true, metadata); - } - else - { - if (makeStaticSlice) - serializeStaticSlice(out, false, metadata); - for (Slice slice : slices) - serializeSlice(out, slice, false, metadata); - } - } - - static long serializedSlicesSize(Slices slices, boolean makeStaticSlice, CFMetaData metadata) - { - long size = TypeSizes.sizeof(slices.size()); - - for (Slice slice : slices) - { - ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, slice.start(), true); - size += ByteBufferUtil.serializedSizeWithShortLength(sliceStart); - ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, slice.end(), false); - size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd); - } - - if (makeStaticSlice) - size += serializedStaticSliceSize(metadata); - - return size; - } - - static long serializedStaticSliceSize(CFMetaData metadata) - { - // unlike serializeStaticSlice(), but we don't care about reversal for size calculations - ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false); - long size = ByteBufferUtil.serializedSizeWithShortLength(sliceStart); - - size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 2)); - size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX); - for (int i = 0; i < metadata.comparator.size(); i++) - { - size += ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER); - size += 1; // EOC - } - return size; - } - - private static void serializeSlice(DataOutputPlus out, Slice slice, boolean isReversed, CFMetaData metadata) throws IOException - { - ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, isReversed ? slice.end() : slice.start(), !isReversed); - ByteBufferUtil.writeWithShortLength(sliceStart, out); - - ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, isReversed ? slice.start() : slice.end(), isReversed); - ByteBufferUtil.writeWithShortLength(sliceEnd, out); - } - - private static void serializeStaticSlice(DataOutputPlus out, boolean isReversed, CFMetaData metadata) throws IOException - { - // if reversed, write an empty bound for the slice start; if reversed, write out an empty bound for the - // slice finish after we've written the static slice start - if (!isReversed) - { - ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false); - ByteBufferUtil.writeWithShortLength(sliceStart, out); - } - - // write out the length of the composite - out.writeShort(2 + metadata.comparator.size() * 3); // two bytes + EOC for each component, plus static prefix - out.writeShort(LegacyLayout.STATIC_PREFIX); - for (int i = 0; i < metadata.comparator.size(); i++) - { - ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out); - // write the EOC, using an inclusive end if we're on the final component - out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0); - } - - if (isReversed) - { - ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, ClusteringBound.BOTTOM, false); - ByteBufferUtil.writeWithShortLength(sliceStart, out); - } - } - - // Returns the deserialized filter, and whether static columns are queried (in pre-3.0, both info are determined by the slices, - // but in 3.0 they are separated: whether static columns are queried or not depends on the ColumnFilter). - static Pair<ClusteringIndexSliceFilter, Boolean> deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws IOException - { - int numSlices = in.readInt(); - ByteBuffer[] startBuffers = new ByteBuffer[numSlices]; - ByteBuffer[] finishBuffers = new ByteBuffer[numSlices]; - for (int i = 0; i < numSlices; i++) - { - startBuffers[i] = ByteBufferUtil.readWithShortLength(in); - finishBuffers[i] = ByteBufferUtil.readWithShortLength(in); - } - - boolean reversed = in.readBoolean(); - - if (reversed) - { - // pre-3.0, reversed query slices put the greater element at the start of the slice - ByteBuffer[] tmp = finishBuffers; - finishBuffers = startBuffers; - startBuffers = tmp; - } - - boolean selectsStatics = false; - Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator); - for (int i = 0; i < numSlices; i++) - { - LegacyLayout.LegacyBound start = LegacyLayout.decodeBound(metadata, startBuffers[i], true); - LegacyLayout.LegacyBound finish = LegacyLayout.decodeBound(metadata, finishBuffers[i], false); - - if (start.isStatic) - { - // If we start at the static block, this means we start at the beginning of the partition in 3.0 - // terms (since 3.0 handles static outside of the slice). - start = LegacyLayout.LegacyBound.BOTTOM; - - // Then if we include the static, records it - if (start.bound.isInclusive()) - selectsStatics = true; - } - else if (start == LegacyLayout.LegacyBound.BOTTOM) - { - selectsStatics = true; - } - - // If the end of the slice is the end of the statics, then that mean this slice was just selecting static - // columns. We have already recorded that in selectsStatics, so we can ignore the slice (which doesn't make - // sense for 3.0). - if (finish.isStatic) - { - assert finish.bound.isInclusive(); // it would make no sense for a pre-3.0 node to have a slice that stops - // before the static columns (since there is nothing before that) - continue; - } - - slicesBuilder.add(Slice.make(start.bound, finish.bound)); - } - - return Pair.create(new ClusteringIndexSliceFilter(slicesBuilder.build(), reversed), selectsStatics); - } - - private static SinglePartitionReadCommand maybeConvertNamesToSlice(SinglePartitionReadCommand command) - { - if (command.clusteringIndexFilter().kind() != ClusteringIndexFilter.Kind.NAMES) - return command; - - CFMetaData metadata = command.metadata(); - - if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) - return command; - - ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter)command.clusteringIndexFilter(); - ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata); - return new SinglePartitionReadCommand( - command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), - command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter); - } - - /** - * Returns true if a names filter on the given table and column selection should be converted to a slice - * filter for compatibility with pre-3.0 nodes, false otherwise. - */ - static boolean shouldConvertNamesToSlice(CFMetaData metadata, PartitionColumns columns) - { - // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite). - if (!metadata.isDense() && metadata.isCompound()) - return true; - - // pre-3.0 nodes don't support names filters for reading collections, so if we're requesting any of those, - // we need to convert this to a slice filter - for (ColumnDefinition column : columns) - { - if (column.type.isMultiCell()) - return true; - } - return false; - } - - /** - * Converts a names filter that is incompatible with pre-3.0 nodes to a slice filter that is compatible. - */ - private static ClusteringIndexSliceFilter convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData metadata) - { - SortedSet<Clustering> requestedRows = filter.requestedRows(); - Slices slices; - if (requestedRows.isEmpty()) - { - slices = Slices.NONE; - } - else if (requestedRows.size() == 1 && requestedRows.first().size() == 0) - { - slices = Slices.ALL; - } - else - { - Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator); - for (Clustering clustering : requestedRows) - slicesBuilder.add(ClusteringBound.inclusiveStartOf(clustering), ClusteringBound.inclusiveEndOf(clustering)); - slices = slicesBuilder.build(); - } - - return new ClusteringIndexSliceFilter(slices, filter.isReversed()); - } - - /** - * Potentially increases the existing query limit to account for the lack of exclusive bounds in pre-3.0 nodes. - * @param limit the existing query limit - * @param slices the requested slices - * @return the updated limit - */ - static int updateLimitForQuery(int limit, Slices slices) - { - // Pre-3.0 nodes don't support exclusive bounds for slices. Instead, we query one more element if necessary - // and filter it later (in LegacyRemoteDataResponse) - if (!slices.hasLowerBound() && ! slices.hasUpperBound()) - return limit; - - for (Slice slice : slices) - { - if (limit == Integer.MAX_VALUE) - return limit; - - if (!slice.start().isInclusive()) - limit++; - if (!slice.end().isInclusive()) - limit++; - } - return limit; - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index cca21f8..c3eae0d 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -32,7 +32,6 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; -import org.apache.cassandra.io.ForwardingVersionedSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; @@ -47,20 +46,6 @@ public abstract class ReadResponse { // Serializer for single partition read response public static final IVersionedSerializer<ReadResponse> serializer = new Serializer(); - // Serializer for the pre-3.0 rang slice responses. - public static final IVersionedSerializer<ReadResponse> legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer(); - // Serializer for partition range read response (this actually delegate to 'serializer' in 3.0 and to - // 'legacyRangeSliceReplySerializer' in older version. - public static final IVersionedSerializer<ReadResponse> rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>() - { - @Override - protected IVersionedSerializer<ReadResponse> delegate(int version) - { - return version < MessagingService.VERSION_30 - ? legacyRangeSliceReplySerializer - : serializer; - } - }; // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough". @@ -95,7 +80,7 @@ public abstract class ReadResponse protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command) { MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion()); + UnfilteredPartitionIterators.digest(iterator, digest, command.digestVersion()); return ByteBuffer.wrap(digest.digest()); } @@ -210,130 +195,12 @@ public abstract class ReadResponse } } - /** - * A remote response from a pre-3.0 node. This needs a separate class in order to cleanly handle trimming and - * reversal of results when the read command calls for it. Pre-3.0 nodes always return results in the normal - * sorted order, even if the query asks for reversed results. Additionally, pre-3.0 nodes do not have a notion of - * exclusive slices on non-composite tables, so extra rows may need to be trimmed. - */ - @VisibleForTesting - static class LegacyRemoteDataResponse extends ReadResponse - { - private final List<ImmutableBTreePartition> partitions; - - @VisibleForTesting - LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions) - { - super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the command - this.partitions = partitions; - } - - public UnfilteredPartitionIterator makeIterator(final ReadCommand command) - { - // Due to a bug in the serialization of AbstractBounds, anything that isn't a Range is understood by pre-3.0 nodes - // as a Bound, which means IncludingExcludingBounds and ExcludingBounds responses may include keys they shouldn't. - // So filter partitions that shouldn't be included here. - boolean skipFirst = false; - boolean skipLast = false; - if (!partitions.isEmpty() && command instanceof PartitionRangeReadCommand) - { - AbstractBounds<PartitionPosition> keyRange = ((PartitionRangeReadCommand)command).dataRange().keyRange(); - boolean isExcludingBounds = keyRange instanceof ExcludingBounds; - skipFirst = isExcludingBounds && !keyRange.contains(partitions.get(0).partitionKey()); - skipLast = (isExcludingBounds || keyRange instanceof IncludingExcludingBounds) && !keyRange.contains(partitions.get(partitions.size() - 1).partitionKey()); - } - - final List<ImmutableBTreePartition> toReturn; - if (skipFirst || skipLast) - { - toReturn = partitions.size() == 1 - ? Collections.emptyList() - : partitions.subList(skipFirst ? 1 : 0, skipLast ? partitions.size() - 1 : partitions.size()); - } - else - { - toReturn = partitions; - } - - return new AbstractUnfilteredPartitionIterator() - { - private int idx; - - public boolean isForThrift() - { - return true; - } - - public CFMetaData metadata() - { - return command.metadata(); - } - - public boolean hasNext() - { - return idx < toReturn.size(); - } - - public UnfilteredRowIterator next() - { - ImmutableBTreePartition partition = toReturn.get(idx++); - - ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey()); - - // Pre-3.0, we would always request one more row than we actually needed and the command-level "start" would - // be the last-returned cell name, so the response would always include it. - UnfilteredRowIterator iterator = partition.unfilteredIterator(command.columnFilter(), filter.getSlices(command.metadata()), filter.isReversed()); - - // Wrap results with a ThriftResultMerger only if they're intended for the thrift command. - if (command.isForThrift()) - return ThriftResultsMerger.maybeWrap(iterator, command.nowInSec()); - else - return iterator; - } - }; - } - - public ByteBuffer digest(ReadCommand command) - { - try (UnfilteredPartitionIterator iterator = makeIterator(command)) - { - return makeDigest(iterator, command); - } - } - - public boolean isDigestResponse() - { - return false; - } - } - private static class Serializer implements IVersionedSerializer<ReadResponse> { public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException { boolean isDigest = response instanceof DigestResponse; ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER; - if (version < MessagingService.VERSION_30) - { - out.writeInt(digest.remaining()); - out.write(digest); - out.writeBoolean(isDigest); - if (!isDigest) - { - assert response.command != null; // we only serialize LocalDataResponse, which always has the command set - try (UnfilteredPartitionIterator iter = response.makeIterator(response.command)) - { - assert iter.hasNext(); - try (UnfilteredRowIterator partition = iter.next()) - { - ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); - LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version); - } - assert !iter.hasNext(); - } - } - return; - } ByteBufferUtil.writeWithVIntLength(digest, out); if (!isDigest) @@ -345,38 +212,12 @@ public abstract class ReadResponse public ReadResponse deserialize(DataInputPlus in, int version) throws IOException { - if (version < MessagingService.VERSION_30) - { - byte[] digest = null; - int digestSize = in.readInt(); - if (digestSize > 0) - { - digest = new byte[digestSize]; - in.readFully(digest, 0, digestSize); - } - boolean isDigest = in.readBoolean(); - assert isDigest == digestSize > 0; - if (isDigest) - { - assert digest != null; - return new DigestResponse(ByteBuffer.wrap(digest)); - } - - // ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply) - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - try (UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key)) - { - if (rowIterator == null) - return new LegacyRemoteDataResponse(Collections.emptyList()); - - return new LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator))); - } - } - ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in); if (digest.hasRemaining()) return new DigestResponse(digest); + // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the + // version, we'll have to deserialize/re-serialize the data to be in the proper version. assert version == MessagingService.VERSION_30; ByteBuffer data = ByteBufferUtil.readWithVIntLength(in); return new RemoteDataResponse(data); @@ -387,28 +228,6 @@ public abstract class ReadResponse boolean isDigest = response instanceof DigestResponse; ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER; - if (version < MessagingService.VERSION_30) - { - long size = TypeSizes.sizeof(digest.remaining()) - + digest.remaining() - + TypeSizes.sizeof(isDigest); - if (!isDigest) - { - assert response.command != null; // we only serialize LocalDataResponse, which always has the command set - try (UnfilteredPartitionIterator iter = response.makeIterator(response.command)) - { - assert iter.hasNext(); - try (UnfilteredRowIterator partition = iter.next()) - { - size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); - size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version); - } - assert !iter.hasNext(); - } - } - return size; - } - long size = ByteBufferUtil.serializedSizeWithVIntLength(digest); if (!isDigest) { @@ -421,81 +240,4 @@ public abstract class ReadResponse return size; } } - - private static class LegacyRangeSliceReplySerializer implements IVersionedSerializer<ReadResponse> - { - public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - // determine the number of partitions upfront for serialization - int numPartitions = 0; - assert response.command != null; // we only serialize LocalDataResponse, which always has the command set - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) - { - while (iterator.hasNext()) - { - try (UnfilteredRowIterator atomIterator = iterator.next()) - { - numPartitions++; - - // we have to fully exhaust the subiterator - while (atomIterator.hasNext()) - atomIterator.next(); - } - } - } - - out.writeInt(numPartitions); - - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) - { - while (iterator.hasNext()) - { - try (UnfilteredRowIterator partition = iterator.next()) - { - ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); - LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version); - } - } - } - } - - public ReadResponse deserialize(DataInputPlus in, int version) throws IOException - { - assert version < MessagingService.VERSION_30; - - int partitionCount = in.readInt(); - ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount); - for (int i = 0; i < partitionCount; i++) - { - ByteBuffer key = ByteBufferUtil.readWithShortLength(in); - try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key)) - { - partitions.add(ImmutableBTreePartition.create(partition)); - } - } - return new LegacyRemoteDataResponse(partitions); - } - - public long serializedSize(ReadResponse response, int version) - { - assert version < MessagingService.VERSION_30; - long size = TypeSizes.sizeof(0); // number of partitions - - assert response.command != null; // we only serialize LocalDataResponse, which always has the command set - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) - { - while (iterator.hasNext()) - { - try (UnfilteredRowIterator partition = iterator.next()) - { - size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); - size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version); - } - } - } - return size; - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java b/src/java/org/apache/cassandra/db/RowIndexEntry.java index e620dc0..a709ec3 100644 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@ -253,7 +253,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory public Serializer(CFMetaData metadata, Version version, SerializationHeader header) { - this.idxInfoSerializer = metadata.serializers().indexInfoSerializer(version, header); + this.idxInfoSerializer = IndexInfo.serializer(version, header); this.version = version; } @@ -264,22 +264,16 @@ public class RowIndexEntry<T> implements IMeasurableMemory public void serialize(RowIndexEntry<IndexInfo> rie, DataOutputPlus out, ByteBuffer indexInfo) throws IOException { - assert version.storeRows() : "We read old index files but we should never write them"; - rie.serialize(out, idxInfoSerializer, indexInfo); } public void serializeForCache(RowIndexEntry<IndexInfo> rie, DataOutputPlus out) throws IOException { - assert version.storeRows(); - rie.serializeForCache(out); } public RowIndexEntry<IndexInfo> deserializeForCache(DataInputPlus in) throws IOException { - assert version.storeRows(); - long position = in.readUnsignedVInt(); switch (in.readByte()) @@ -297,8 +291,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory public static void skipForCache(DataInputPlus in, Version version) throws IOException { - assert version.storeRows(); - /* long position = */in.readUnsignedVInt(); switch (in.readByte()) { @@ -317,9 +309,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition) throws IOException { - if (!version.storeRows()) - return LegacyShallowIndexedEntry.deserialize(in, indexFilePosition, idxInfoSerializer); - long position = in.readUnsignedVInt(); int size = (int)in.readUnsignedVInt(); @@ -354,9 +343,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory public long deserializePositionAndSkip(DataInputPlus in) throws IOException { - if (!version.storeRows()) - return LegacyShallowIndexedEntry.deserializePositionAndSkip(in); - return ShallowIndexedEntry.deserializePositionAndSkip(in); } @@ -367,7 +353,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory */ public static long readPosition(DataInputPlus in, Version version) throws IOException { - return version.storeRows() ? in.readUnsignedVInt() : in.readLong(); + return in.readUnsignedVInt(); } public static void skip(DataInputPlus in, Version version) throws IOException @@ -378,7 +364,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException { - int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt(); + int size = (int)in.readUnsignedVInt(); if (size <= 0) return; @@ -413,164 +399,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory out.writeByte(CACHE_NOT_INDEXED); } - private static final class LegacyShallowIndexedEntry extends RowIndexEntry<IndexInfo> - { - private static final long BASE_SIZE; - static - { - BASE_SIZE = ObjectSizes.measure(new LegacyShallowIndexedEntry(0, 0, DeletionTime.LIVE, 0, new int[0], null, 0)); - } - - private final long indexFilePosition; - private final int[] offsets; - @Unmetered - private final IndexInfo.Serializer idxInfoSerializer; - private final DeletionTime deletionTime; - private final long headerLength; - private final int serializedSize; - - private LegacyShallowIndexedEntry(long dataFilePosition, long indexFilePosition, - DeletionTime deletionTime, long headerLength, - int[] offsets, IndexInfo.Serializer idxInfoSerializer, - int serializedSize) - { - super(dataFilePosition); - this.deletionTime = deletionTime; - this.headerLength = headerLength; - this.indexFilePosition = indexFilePosition; - this.offsets = offsets; - this.idxInfoSerializer = idxInfoSerializer; - this.serializedSize = serializedSize; - } - - @Override - public DeletionTime deletionTime() - { - return deletionTime; - } - - @Override - public long headerLength() - { - return headerLength; - } - - @Override - public long unsharedHeapSize() - { - return BASE_SIZE + offsets.length * TypeSizes.sizeof(0); - } - - @Override - public int columnsIndexCount() - { - return offsets.length; - } - - @Override - public void serialize(DataOutputPlus out, IndexInfo.Serializer idxInfoSerializer, ByteBuffer indexInfo) - { - throw new UnsupportedOperationException("serializing legacy index entries is not supported"); - } - - @Override - public void serializeForCache(DataOutputPlus out) - { - throw new UnsupportedOperationException("serializing legacy index entries is not supported"); - } - - @Override - public IndexInfoRetriever openWithIndex(FileHandle indexFile) - { - int fieldsSize = (int) DeletionTime.serializer.serializedSize(deletionTime) - + TypeSizes.sizeof(0); // columnIndexCount - indexEntrySizeHistogram.update(serializedSize); - indexInfoCountHistogram.update(offsets.length); - return new LegacyIndexInfoRetriever(indexFilePosition + - TypeSizes.sizeof(0L) + // position - TypeSizes.sizeof(0) + // indexInfoSize - fieldsSize, - offsets, indexFile.createReader(), idxInfoSerializer); - } - - public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long indexFilePosition, - IndexInfo.Serializer idxInfoSerializer) throws IOException - { - long dataFilePosition = in.readLong(); - - int size = in.readInt(); - if (size == 0) - { - return new RowIndexEntry<>(dataFilePosition); - } - else if (size <= DatabaseDescriptor.getColumnIndexCacheSize()) - { - return new IndexedEntry(dataFilePosition, in, idxInfoSerializer); - } - else - { - DeletionTime deletionTime = DeletionTime.serializer.deserialize(in); - - // For legacy sstables (i.e. sstables pre-"ma", pre-3.0) we have to scan all serialized IndexInfo - // objects to calculate the offsets array. However, it might be possible to deserialize all - // IndexInfo objects here - but to just skip feels more gentle to the heap/GC. - - int entries = in.readInt(); - int[] offsets = new int[entries]; - - TrackedDataInputPlus tracked = new TrackedDataInputPlus(in); - long start = tracked.getBytesRead(); - long headerLength = 0L; - for (int i = 0; i < entries; i++) - { - offsets[i] = (int) (tracked.getBytesRead() - start); - if (i == 0) - { - IndexInfo info = idxInfoSerializer.deserialize(tracked); - headerLength = info.offset; - } - else - idxInfoSerializer.skip(tracked); - } - - return new LegacyShallowIndexedEntry(dataFilePosition, indexFilePosition, deletionTime, headerLength, offsets, idxInfoSerializer, size); - } - } - - static long deserializePositionAndSkip(DataInputPlus in) throws IOException - { - long position = in.readLong(); - - int size = in.readInt(); - if (size > 0) - in.skipBytesFully(size); - - return position; - } - } - - private static final class LegacyIndexInfoRetriever extends FileIndexInfoRetriever - { - private final int[] offsets; - - private LegacyIndexInfoRetriever(long indexFilePosition, int[] offsets, FileDataInput reader, IndexInfo.Serializer idxInfoSerializer) - { - super(indexFilePosition, reader, idxInfoSerializer); - this.offsets = offsets; - } - - IndexInfo fetchIndex(int index) throws IOException - { - retrievals++; - - // seek to posision of IndexInfo - indexReader.seek(indexInfoFilePosition + offsets[index]); - - // deserialize IndexInfo - return idxInfoSerializer.deserialize(indexReader); - } - } - /** * An entry in the row index for a row whose columns are indexed - used for both legacy and current formats. */ @@ -622,14 +450,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory for (int i = 0; i < columnsIndexCount; i++) this.columnsIndex[i] = idxInfoSerializer.deserialize(in); - int[] offsets = null; - if (version.storeRows()) - { - offsets = new int[this.columnsIndex.length]; - for (int i = 0; i < offsets.length; i++) - offsets[i] = in.readInt(); - } - this.offsets = offsets; + this.offsets = new int[this.columnsIndex.length]; + for (int i = 0; i < offsets.length; i++) + offsets[i] = in.readInt(); this.indexedPartSize = indexedPartSize;