http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index c3f036a..913a1de 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -18,18 +18,24 @@ package org.apache.cassandra.db; import java.io.IOException; -import java.util.Iterator; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -37,7 +43,10 @@ import org.apache.cassandra.metrics.TableMetrics; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.Pair; /** * General interface for storage-engine read commands (common to both range and @@ -51,6 +60,10 @@ public abstract class ReadCommand implements ReadQuery public static final IVersionedSerializer<ReadCommand> serializer = new Serializer(); + public static final IVersionedSerializer<ReadCommand> legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer(); + public static final IVersionedSerializer<ReadCommand> legacyReadCommandSerializer = new LegacyReadCommandSerializer(); + private final Kind kind; private final CFMetaData metadata; private final int nowInSec; @@ -72,9 +85,9 @@ public abstract class ReadCommand implements ReadQuery SINGLE_PARTITION (SinglePartitionReadCommand.selectionDeserializer), PARTITION_RANGE (PartitionRangeReadCommand.selectionDeserializer); - private SelectionDeserializer selectionDeserializer; + private final SelectionDeserializer selectionDeserializer; - private Kind(SelectionDeserializer selectionDeserializer) + Kind(SelectionDeserializer selectionDeserializer) { this.selectionDeserializer = selectionDeserializer; } @@ -251,8 +264,6 @@ public abstract class ReadCommand implements ReadQuery /** * Executes this command on the local host. * - * @param cfs the store for the table queried by this command. - * * @return an iterator over the result of executing this command locally. */ @SuppressWarnings("resource") // The result iterator is closed upon exceptions (we know it's fine to potentially not close the intermediary @@ -281,7 +292,7 @@ public abstract class ReadCommand implements ReadQuery // we'll probably want to optimize by pushing it down the layer (like for dropped columns) as it // would be more efficient (the sooner we discard stuff we know we don't care, the less useless // processing we do on it). - return limits().filter(rowFilter().filter(resultIterator, nowInSec()), nowInSec()); + return limits().filter(updatedFilter.filter(resultIterator, nowInSec()), nowInSec()); } catch (RuntimeException | Error e) { @@ -389,7 +400,7 @@ public abstract class ReadCommand implements ReadQuery logger.warn(msg); } - Tracing.trace("Read {} live and {} tombstone cells{}", new Object[]{ liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "") }); + Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); } } }; @@ -398,12 +409,16 @@ public abstract class ReadCommand implements ReadQuery /** * Creates a message for this command. */ - public MessageOut<ReadCommand> createMessage() + public MessageOut<ReadCommand> createMessage(int version) { - // TODO: we should use different verbs for old message (RANGE_SLICE, PAGED_RANGE) - return new MessageOut<>(MessagingService.Verb.READ, this, serializer); + if (version >= MessagingService.VERSION_30) + return new MessageOut<>(MessagingService.Verb.READ, this, serializer); + + return createLegacyMessage(); } + protected abstract MessageOut<ReadCommand> createLegacyMessage(); + protected abstract void appendCQLWhereClause(StringBuilder sb); // Skip purgeable tombstones. We do this because it's safe to do (post-merge of the memtable and sstable at least), it @@ -433,11 +448,11 @@ public abstract class ReadCommand implements ReadQuery { StringBuilder sb = new StringBuilder(); sb.append("SELECT ").append(columnFilter()); - sb.append(" FROM ").append(metadata().ksName).append(".").append(metadata.cfName); + sb.append(" FROM ").append(metadata().ksName).append('.').append(metadata.cfName); appendCQLWhereClause(sb); if (limits() != DataLimits.NONE) - sb.append(" ").append(limits()); + sb.append(' ').append(limits()); return sb.toString(); } @@ -465,8 +480,8 @@ public abstract class ReadCommand implements ReadQuery public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException { - if (version < MessagingService.VERSION_30) - throw new UnsupportedOperationException(); + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; out.writeByte(command.kind.ordinal()); out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift())); @@ -482,7 +497,7 @@ public abstract class ReadCommand implements ReadQuery public ReadCommand deserialize(DataInputPlus in, int version) throws IOException { if (version < MessagingService.VERSION_30) - throw new UnsupportedOperationException(); + return legacyReadCommandSerializer.deserialize(in, version); Kind kind = Kind.values()[in.readByte()]; int flags = in.readByte(); @@ -499,8 +514,8 @@ public abstract class ReadCommand implements ReadQuery public long serializedSize(ReadCommand command, int version) { - if (version < MessagingService.VERSION_30) - throw new UnsupportedOperationException(); + // for serialization, createLegacyMessage() should cause legacyReadCommandSerializer to be used directly + assert version >= MessagingService.VERSION_30; return 2 // kind + flags + CFMetaData.serializer.serializedSize(command.metadata(), version) @@ -511,4 +526,950 @@ public abstract class ReadCommand implements ReadQuery + command.selectionSerializedSize(version); } } + + private enum LegacyType + { + GET_BY_NAMES((byte)1), + GET_SLICES((byte)2); + + public final byte serializedValue; + + LegacyType(byte b) + { + this.serializedValue = b; + } + + public static LegacyType fromPartitionFilterKind(ClusteringIndexFilter.Kind kind) + { + return kind == ClusteringIndexFilter.Kind.SLICE + ? GET_SLICES + : GET_BY_NAMES; + } + + public static LegacyType fromSerializedValue(byte b) + { + return b == 1 ? GET_BY_NAMES : GET_SLICES; + } + } + + /** + * Serializer for pre-3.0 RangeSliceCommands. + */ + private static class LegacyRangeSliceCommandSerializer implements IVersionedSerializer<ReadCommand> + { + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert !rangeCommand.dataRange().isPaging(); + + // convert pre-3.0 incompatible names filters to slice filters + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + // begin DiskAtomFilterSerializer.serialize() + if (rangeCommand.isNamesQuery()) + { + out.writeByte(1); // 0 for slices, 1 for names + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, filter, out); + } + else + { + out.writeByte(0); // 0 for slices, 1 for names + + // slice filter serialization + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + // limit + DataLimits.Kind kind = rangeCommand.limits().kind(); + boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; + if (isDistinct) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + if (kind == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (isDistinct && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + serializeRowFilter(out, rangeCommand.rowFilter()); + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // maxResults + out.writeInt(rangeCommand.limits().count()); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // if for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + + // isPaging + out.writeBoolean(false); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + + ClusteringIndexFilter filter; + ColumnFilter selection; + int compositesToGroup = 0; + int perPartitionLimit = -1; + byte readType = in.readByte(); // 0 for slices, 1 for names + if (readType == 1) + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata); + selection = selectionAndFilter.left; + filter = selectionAndFilter.right; + } + else + { + filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + perPartitionLimit = in.readInt(); + compositesToGroup = in.readInt(); + selection = getColumnSelectionForSlice((ClusteringIndexSliceFilter) filter, compositesToGroup, metadata); + } + + RowFilter rowFilter = deserializeRowFilter(in, metadata); + + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + int maxResults = in.readInt(); + + in.readBoolean(); // countCQL3Rows (not needed) + in.readBoolean(); // isPaging (not needed) + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(maxResults, perPartitionLimit); + else + limits = DataLimits.cqlLimits(maxResults); + + return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter)); + } + + static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rowFilter.iterator()); + out.writeInt(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out); + expression.operator().writeTo(out); + ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out); + } + } + + static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numRowFilters = in.readInt(); + if (numRowFilters == 0) + return RowFilter.NONE; + + RowFilter rowFilter = RowFilter.create(numRowFilters); + for (int i = 0; i < numRowFilters; i++) + { + ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(columnName); + Operator op = Operator.readFrom(in); + ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in); + rowFilter.add(column, op, indexValue); + } + return rowFilter; + } + + static long serializedRowFilterSize(RowFilter rowFilter) + { + long size = TypeSizes.sizeof(0); // rowFilterCount + for (RowFilter.Expression expression : rowFilter) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(0); // operator int value + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + return size; + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + rangeCommand = maybeConvertNamesToSlice(rangeCommand); + CFMetaData metadata = rangeCommand.metadata(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += 1; // single byte flag: 0 for slices, 1 for names + if (rangeCommand.isNamesQuery()) + { + PartitionColumns columns = rangeCommand.columnFilter().fetchedColumns(); + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter; + size += LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, columns); + } + else + { + ClusteringIndexSliceFilter filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + size += TypeSizes.sizeof(0); // compositesToGroup + } + + if (rangeCommand.rowFilter().equals(RowFilter.NONE)) + { + size += TypeSizes.sizeof(0); + } + else + { + ArrayList<RowFilter.Expression> indexExpressions = Lists.newArrayList(rangeCommand.rowFilter().iterator()); + size += TypeSizes.sizeof(indexExpressions.size()); + for (RowFilter.Expression expression : indexExpressions) + { + size += ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes); + size += TypeSizes.sizeof(expression.operator().ordinal()); + size += ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue()); + } + } + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + size += TypeSizes.sizeof(rangeCommand.limits().count()); + size += TypeSizes.sizeof(!rangeCommand.isForThrift()); + return size + TypeSizes.sizeof(rangeCommand.dataRange().isPaging()); + } + + static PartitionRangeReadCommand maybeConvertNamesToSlice(PartitionRangeReadCommand command) + { + if (!command.dataRange().isNamesQuery()) + return command; + + CFMetaData metadata = command.metadata(); + if (!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) command.dataRange().clusteringIndexFilter; + ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); + DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); + return new PartitionRangeReadCommand( + command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), newRange); + } + + static ColumnFilter getColumnSelectionForSlice(ClusteringIndexSliceFilter filter, int compositesToGroup, CFMetaData metadata) + { + // A value of -2 indicates this is a DISTINCT query that doesn't select static columns, only partition keys. + if (compositesToGroup == -2) + return ColumnFilter.selection(PartitionColumns.NONE); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + PartitionColumns columns = filter.selects(Clustering.STATIC_CLUSTERING) + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + return new ColumnFilter.Builder(metadata).addAll(columns).build(); + } + } + + /** + * Serializer for pre-3.0 PagedRangeCommands. + */ + private static class LegacyPagedRangeCommandSerializer implements IVersionedSerializer<ReadCommand> + { + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + assert rangeCommand.dataRange().isPaging(); + + CFMetaData metadata = rangeCommand.metadata(); + + out.writeUTF(metadata.ksName); + out.writeUTF(metadata.cfName); + out.writeLong(rangeCommand.nowInSec() * 1000L); // convert from seconds to millis + + AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(), out, version); + + // pre-3.0 nodes don't accept names filters for paged range commands + ClusteringIndexSliceFilter filter; + if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES) + filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata); + else + filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + // slice filter + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + LegacyReadCommandSerializer.serializeSlices(out, filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata); + out.writeBoolean(filter.isReversed()); + + // slice filter's count + DataLimits.Kind kind = rangeCommand.limits().kind(); + boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; + if (isDistinct) + out.writeInt(1); + else + out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(), filter.requestedSlices())); + + // compositesToGroup + boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + int compositesToGroup; + if (kind == DataLimits.Kind.THRIFT_LIMIT) + compositesToGroup = -1; + else if (isDistinct && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + + // command-level "start" and "stop" composites. The start is the last-returned cell name if there is one, + // otherwise it's the same as the slice filter's start. The stop appears to always be the same as the + // slice filter's stop. + DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange(); + Clustering lastReturned = pagingRange.getLastReturned(); + Slice.Bound newStart = Slice.Bound.exclusiveStartOf(lastReturned); + Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1); + ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, newStart, true), out); + ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering()), out); + + LegacyRangeSliceCommandSerializer.serializeRowFilter(out, rangeCommand.rowFilter()); + + // command-level limit + // Pre-3.0 we would always request one more row than we actually needed and the command-level "start" would + // be the last-returned cell name, so the response would always include it. When dealing with compound comparators, + // we can pass an exclusive start and use the normal limit. However, when dealing with non-compound comparators, + // pre-3.0 nodes cannot perform exclusive slices, so we need to request one extra row. + int maxResults = rangeCommand.limits().count() + (metadata.isCompound() ? 0 : 1); + out.writeInt(maxResults); + + // countCQL3Rows + if (rangeCommand.isForThrift() || rangeCommand.limits().perPartitionCount() == 1) // for Thrift or DISTINCT + out.writeBoolean(false); + else + out.writeBoolean(true); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + + String keyspace = in.readUTF(); + String columnFamily = in.readUTF(); + + CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); + if (metadata == null) + { + String message = String.format("Got legacy paged range command for nonexistent table %s.%s.", keyspace, columnFamily); + throw new UnknownColumnFamilyException(message, null); + } + + int nowInSec = (int) (in.readLong() / 1000); // convert from millis to seconds + AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); + + ClusteringIndexSliceFilter filter = LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata); + int perPartitionLimit = in.readInt(); + int compositesToGroup = in.readInt(); + + // command-level Composite "start" and "stop" + LegacyLayout.LegacyBound startBound = LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), true); + ByteBufferUtil.readWithShortLength(in); // the composite "stop", which isn't actually needed + + // pre-3.0 nodes will sometimes use a clustering prefix for the Command-level start and stop, but in all + // cases this should also be represented by the ClusteringIndexFilter, so we can ignore them + Clustering startClustering; + if (startBound == LegacyLayout.LegacyBound.BOTTOM || startBound.bound.size() < metadata.comparator.size()) + startClustering = Clustering.EMPTY; + else + startClustering = startBound.getAsClustering(metadata); + + ColumnFilter selection = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(filter, compositesToGroup, metadata); + + RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata); + int maxResults = in.readInt(); + in.readBoolean(); // countCQL3Rows + + + boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); + boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + DataLimits limits; + if (isDistinct) + limits = DataLimits.distinctLimits(maxResults); + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(1, perPartitionLimit); // we only use paging w/ thrift for get_count(), so partition limit must be 1 + else + limits = DataLimits.cqlLimits(maxResults); + + limits = limits.forPaging(maxResults); + + // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it + // missing without any problems, so we can safely always set "inclusive" to false in the data range + DataRange dataRange = new DataRange(keyRange, filter).forPaging(keyRange, metadata.comparator, startClustering, false); + return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, dataRange); + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.PARTITION_RANGE; + + PartitionRangeReadCommand rangeCommand = (PartitionRangeReadCommand) command; + CFMetaData metadata = rangeCommand.metadata(); + assert rangeCommand.dataRange().isPaging(); + + long size = TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof(metadata.cfName); + size += TypeSizes.sizeof((long) rangeCommand.nowInSec()); + + size += AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(), version); + + // pre-3.0 nodes only accept slice filters for paged range commands + ClusteringIndexSliceFilter filter; + if (rangeCommand.dataRange().clusteringIndexFilter.kind() == ClusteringIndexFilter.Kind.NAMES) + filter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter, metadata); + else + filter = (ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter; + + // slice filter + boolean makeStaticSlice = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && !filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); + size += LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), makeStaticSlice, metadata); + size += TypeSizes.sizeof(filter.isReversed()); + + // slice filter's count + size += TypeSizes.sizeof(rangeCommand.limits().perPartitionCount()); + + // compositesToGroup + size += TypeSizes.sizeof(0); + + // command-level Composite "start" and "stop" + DataRange.Paging pagingRange = (DataRange.Paging) rangeCommand.dataRange(); + Clustering lastReturned = pagingRange.getLastReturned(); + Slice lastSlice = filter.requestedSlices().get(filter.requestedSlices().size() - 1); + size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastReturned)); + size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata, lastSlice.end().clustering())); + + size += LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter()); + + // command-level limit + size += TypeSizes.sizeof(rangeCommand.limits().count()); + + // countCQL3Rows + return size + TypeSizes.sizeof(true); + } + } + + /** + * Serializer for pre-3.0 ReadCommands. + */ + static class LegacyReadCommandSerializer implements IVersionedSerializer<ReadCommand> + { + public void serialize(ReadCommand command, DataOutputPlus out, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.SINGLE_PARTITION; + + SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command; + singleReadCommand = maybeConvertNamesToSlice(singleReadCommand); + + CFMetaData metadata = singleReadCommand.metadata(); + + out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue); + + out.writeBoolean(singleReadCommand.isDigestQuery()); + out.writeUTF(metadata.ksName); + ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), out); + out.writeUTF(metadata.cfName); + out.writeLong(singleReadCommand.nowInSec() * 1000L); // convert from seconds to millis + + if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE) + serializeSliceCommand((SinglePartitionSliceCommand) singleReadCommand, out); + else + serializeNamesCommand((SinglePartitionNamesCommand) singleReadCommand, out); + } + + public ReadCommand deserialize(DataInputPlus in, int version) throws IOException + { + assert version < MessagingService.VERSION_30; + LegacyType msgType = LegacyType.fromSerializedValue(in.readByte()); + + boolean isDigest = in.readBoolean(); + String keyspaceName = in.readUTF(); + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + String cfName = in.readUTF(); + long nowInMillis = in.readLong(); + int nowInSeconds = (int) (nowInMillis / 1000); // convert from millis to seconds + CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName); + DecoratedKey dk = metadata.partitioner.decorateKey(key); + + switch (msgType) + { + case GET_BY_NAMES: + return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds); + case GET_SLICES: + return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds); + default: + throw new AssertionError(); + } + } + + public long serializedSize(ReadCommand command, int version) + { + assert version < MessagingService.VERSION_30; + assert command.kind == Kind.SINGLE_PARTITION; + SinglePartitionReadCommand singleReadCommand = (SinglePartitionReadCommand) command; + singleReadCommand = maybeConvertNamesToSlice(singleReadCommand); + + int keySize = singleReadCommand.partitionKey().getKey().remaining(); + + CFMetaData metadata = singleReadCommand.metadata(); + + long size = 1; // message type (single byte) + size += TypeSizes.sizeof(command.isDigestQuery()); + size += TypeSizes.sizeof(metadata.ksName); + size += TypeSizes.sizeof((short) keySize) + keySize; + size += TypeSizes.sizeof((long) command.nowInSec()); + + if (singleReadCommand.clusteringIndexFilter().kind() == ClusteringIndexFilter.Kind.SLICE) + return size + serializedSliceCommandSize((SinglePartitionSliceCommand) singleReadCommand); + else + return size + serializedNamesCommandSize((SinglePartitionNamesCommand) singleReadCommand); + } + + private void serializeNamesCommand(SinglePartitionNamesCommand command, DataOutputPlus out) throws IOException + { + serializeNamesFilter(command, command.clusteringIndexFilter(), out); + } + + + private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException + { + PartitionColumns columns = command.columnFilter().fetchedColumns(); + CFMetaData metadata = command.metadata(); + SortedSet<Clustering> requestedRows = filter.requestedRows(); + + if (requestedRows.isEmpty()) + { + // only static columns are requested + out.writeInt(columns.size()); + for (ColumnDefinition column : columns) + ByteBufferUtil.writeWithShortLength(column.name.bytes, out); + } + else + { + out.writeInt(requestedRows.size() * columns.size()); + for (Clustering clustering : requestedRows) + { + for (ColumnDefinition column : columns) + ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null), out); + } + } + + // countCql3Rows should be true if it's not for Thrift or a DISTINCT query + if (command.isForThrift() || (command.limits().kind() == DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1)) + out.writeBoolean(false); // it's compact and not a DISTINCT query + else + out.writeBoolean(true); + } + + static long serializedNamesFilterSize(ClusteringIndexNamesFilter filter, CFMetaData metadata, PartitionColumns fetchedColumns) + { + SortedSet<Clustering> requestedRows = filter.requestedRows(); + + long size = 0; + if (requestedRows.isEmpty()) + { + // only static columns are requested + size += TypeSizes.sizeof(fetchedColumns.size()); + for (ColumnDefinition column : fetchedColumns) + size += ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes); + } + else + { + size += TypeSizes.sizeof(requestedRows.size() * fetchedColumns.size()); + for (Clustering clustering : requestedRows) + { + for (ColumnDefinition column : fetchedColumns) + size += ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata, clustering, column.name.bytes, null)); + } + } + + return size + TypeSizes.sizeof(true); // countCql3Rows + } + + private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException + { + Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata); + + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionNamesCommand( + isDigest, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE, + key, selectionAndFilter.right); + } + + static Pair<ColumnFilter, ClusteringIndexNamesFilter> deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numCellNames = in.readInt(); + + // The names filter could include either a) static columns or b) normal columns with the clustering columns + // fully specified. We need to handle those cases differently in 3.0. + NavigableSet<Clustering> clusterings = new TreeSet<>(metadata.comparator); + + ColumnFilter.Builder selectionBuilder = new ColumnFilter.Builder(metadata); + for (int i = 0; i < numCellNames; i++) + { + ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in); + LegacyLayout.LegacyCellName cellName; + try + { + cellName = LegacyLayout.decodeCellName(metadata, buffer); + } + catch (UnknownColumnException exc) + { + // TODO this probably needs a new exception class that shares a parent with UnknownColumnFamilyException + throw new UnknownColumnFamilyException( + "Received legacy range read command with names filter for unrecognized column name. " + + "Fill name in filter (hex): " + ByteBufferUtil.bytesToHex(buffer), metadata.cfId); + } + + if (!cellName.clustering.equals(Clustering.STATIC_CLUSTERING)) + clusterings.add(cellName.clustering); + + selectionBuilder.add(cellName.column); + } + + in.readBoolean(); // countCql3Rows + + // clusterings cannot include STATIC_CLUSTERING, so if the names filter is for static columns, clusterings + // will be empty. However, by requesting the static columns in our ColumnFilter, this will still work. + ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false); + return Pair.create(selectionBuilder.build(), filter); + } + + private long serializedNamesCommandSize(SinglePartitionNamesCommand command) + { + ClusteringIndexNamesFilter filter = command.clusteringIndexFilter(); + PartitionColumns columns = command.columnFilter().fetchedColumns(); + return serializedNamesFilterSize(filter, command.metadata(), columns); + } + + private void serializeSliceCommand(SinglePartitionSliceCommand command, DataOutputPlus out) throws IOException + { + CFMetaData metadata = command.metadata(); + ClusteringIndexSliceFilter filter = command.clusteringIndexFilter(); + + Slices slices = filter.requestedSlices(); + boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING); + serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, metadata); + + out.writeBoolean(filter.isReversed()); + + boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING); + DataLimits.Kind kind = command.limits().kind(); + boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1; + if (isDistinct) + out.writeInt(1); // the limit is always 1 for DISTINCT queries + else + out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices())); + + int compositesToGroup; + if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense()) + compositesToGroup = -1; + else if (isDistinct && !selectsStatics) + compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) + else + compositesToGroup = metadata.clusteringColumns().size(); + + out.writeInt(compositesToGroup); + } + + private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException + { + ClusteringIndexSliceFilter filter = deserializeSlicePartitionFilter(in, metadata); + int count = in.readInt(); + int compositesToGroup = in.readInt(); + + // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all + boolean selectsStatics = filter.selects(Clustering.STATIC_CLUSTERING); + PartitionColumns columns = selectsStatics + ? metadata.partitionColumns() + : metadata.partitionColumns().withoutStatics(); + ColumnFilter columnFilter = new ColumnFilter.Builder(metadata).addAll(columns).build(); + + boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics); + DataLimits limits; + if (compositesToGroup == -2 || isDistinct) + limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value + else if (compositesToGroup == -1) + limits = DataLimits.thriftLimits(1, count); + else + limits = DataLimits.cqlLimits(count); + + // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift + return new SinglePartitionSliceCommand(isDigest, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter); + } + + private long serializedSliceCommandSize(SinglePartitionSliceCommand command) + { + CFMetaData metadata = command.metadata(); + ClusteringIndexSliceFilter filter = command.clusteringIndexFilter(); + + Slices slices = filter.requestedSlices(); + boolean makeStaticSlice = !command.columnFilter().fetchedColumns().statics.isEmpty() && !slices.selects(Clustering.STATIC_CLUSTERING); + + long size = serializedSlicesSize(slices, makeStaticSlice, metadata); + size += TypeSizes.sizeof(command.clusteringIndexFilter().isReversed()); + size += TypeSizes.sizeof(command.limits().count()); + return size + TypeSizes.sizeof(0); // compositesToGroup + } + + static void serializeSlices(DataOutputPlus out, Slices slices, boolean isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException + { + out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0)); + + // In 3.0 we always store the slices in normal comparator order. Pre-3.0 nodes expect the slices to + // be in reversed order if the query is reversed, so we handle that here. + if (isReversed) + { + for (int i = slices.size() - 1; i >= 0; i--) + serializeSlice(out, slices.get(i), true, metadata); + if (makeStaticSlice) + serializeStaticSlice(out, true, metadata); + } + else + { + if (makeStaticSlice) + serializeStaticSlice(out, false, metadata); + for (Slice slice : slices) + serializeSlice(out, slice, false, metadata); + } + } + + static long serializedSlicesSize(Slices slices, boolean makeStaticSlice, CFMetaData metadata) + { + long size = TypeSizes.sizeof(slices.size()); + + for (Slice slice : slices) + { + ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, slice.start(), true); + size += ByteBufferUtil.serializedSizeWithShortLength(sliceStart); + ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, slice.end(), false); + size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd); + } + + if (makeStaticSlice) + size += serializedStaticSliceSize(metadata); + + return size; + } + + static long serializedStaticSliceSize(CFMetaData metadata) + { + // unlike serializeStaticSlice(), but we don't care about reversal for size calculations + ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false); + long size = ByteBufferUtil.serializedSizeWithShortLength(sliceStart); + + size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 2)); + size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX); + for (int i = 0; i < metadata.comparator.size(); i++) + { + size += ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER); + size += 1; // EOC + } + return size; + } + + private static void serializeSlice(DataOutputPlus out, Slice slice, boolean isReversed, CFMetaData metadata) throws IOException + { + ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, isReversed ? slice.end() : slice.start(), !isReversed); + ByteBufferUtil.writeWithShortLength(sliceStart, out); + + ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, isReversed ? slice.start() : slice.end(), isReversed); + ByteBufferUtil.writeWithShortLength(sliceEnd, out); + } + + private static void serializeStaticSlice(DataOutputPlus out, boolean isReversed, CFMetaData metadata) throws IOException + { + // if reversed, write an empty bound for the slice start; if reversed, write out an empty bound for the + // slice finish after we've written the static slice start + if (!isReversed) + { + ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false); + ByteBufferUtil.writeWithShortLength(sliceStart, out); + } + + // write out the length of the composite + out.writeShort(2 + metadata.comparator.size() * 3); // two bytes + EOC for each component, plus static prefix + out.writeShort(LegacyLayout.STATIC_PREFIX); + for (int i = 0; i < metadata.comparator.size(); i++) + { + ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out); + // write the EOC, using an inclusive end if we're on the final component + out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0); + } + + if (isReversed) + { + ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, Slice.Bound.BOTTOM, false); + ByteBufferUtil.writeWithShortLength(sliceStart, out); + } + } + + static ClusteringIndexSliceFilter deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws IOException + { + int numSlices = in.readInt(); + ByteBuffer[] startBuffers = new ByteBuffer[numSlices]; + ByteBuffer[] finishBuffers = new ByteBuffer[numSlices]; + for (int i = 0; i < numSlices; i++) + { + startBuffers[i] = ByteBufferUtil.readWithShortLength(in); + finishBuffers[i] = ByteBufferUtil.readWithShortLength(in); + } + + // we have to know if the query is reversed before we can correctly build the slices + boolean reversed = in.readBoolean(); + + Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator); + for (int i = 0; i < numSlices; i++) + { + Slice.Bound start, finish; + if (!reversed) + { + start = LegacyLayout.decodeBound(metadata, startBuffers[i], true).bound; + finish = LegacyLayout.decodeBound(metadata, finishBuffers[i], false).bound; + } + else + { + // pre-3.0, reversed query slices put the greater element at the start of the slice + finish = LegacyLayout.decodeBound(metadata, startBuffers[i], false).bound; + start = LegacyLayout.decodeBound(metadata, finishBuffers[i], true).bound; + } + slicesBuilder.add(Slice.make(start, finish)); + } + + return new ClusteringIndexSliceFilter(slicesBuilder.build(), reversed); + } + + private static SinglePartitionReadCommand maybeConvertNamesToSlice(SinglePartitionReadCommand command) + { + if (command.clusteringIndexFilter().kind() != ClusteringIndexFilter.Kind.NAMES) + return command; + + CFMetaData metadata = command.metadata(); + + if (!shouldConvertNamesToSlice(metadata, command.columnFilter().fetchedColumns())) + return command; + + ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter(); + ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata); + return new SinglePartitionSliceCommand( + command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(), + command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter); + } + + /** + * Returns true if a names filter on the given table and column selection should be converted to a slice + * filter for compatibility with pre-3.0 nodes, false otherwise. + */ + static boolean shouldConvertNamesToSlice(CFMetaData metadata, PartitionColumns columns) + { + // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice for CQL3 tables (not dense, composite). + if (!metadata.isDense() && metadata.isCompound()) + return true; + + // pre-3.0 nodes don't support names filters for reading collections, so if we're requesting any of those, + // we need to convert this to a slice filter + for (ColumnDefinition column : columns) + { + if (column.type.isMultiCell()) + return true; + } + return false; + } + + /** + * Converts a names filter that is incompatible with pre-3.0 nodes to a slice filter that is compatible. + */ + private static ClusteringIndexSliceFilter convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData metadata) + { + SortedSet<Clustering> requestedRows = filter.requestedRows(); + Slices slices; + if (requestedRows.isEmpty() || requestedRows.size() == 1 && requestedRows.first().size() == 0) + { + slices = Slices.ALL; + } + else + { + Slices.Builder slicesBuilder = new Slices.Builder(metadata.comparator); + for (Clustering clustering : requestedRows) + slicesBuilder.add(Slice.Bound.inclusiveStartOf(clustering), Slice.Bound.inclusiveEndOf(clustering)); + slices = slicesBuilder.build(); + } + + return new ClusteringIndexSliceFilter(slices, filter.isReversed()); + } + + /** + * Potentially increases the existing query limit to account for the lack of exclusive bounds in pre-3.0 nodes. + * @param limit the existing query limit + * @param slices the requested slices + * @return the updated limit + */ + static int updateLimitForQuery(int limit, Slices slices) + { + // Pre-3.0 nodes don't support exclusive bounds for slices. Instead, we query one more element if necessary + // and filter it later (in LegacyRemoteDataResponse) + if (!slices.hasLowerBound() && ! slices.hasUpperBound()) + return limit; + + for (Slice slice : slices) + { + if (limit == Integer.MAX_VALUE) + return limit; + + if (!slice.start().isInclusive()) + limit++; + if (!slice.end().isInclusive()) + limit++; + } + return limit; + } + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index f85d406..9cde8dc 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -17,8 +17,8 @@ */ package org.apache.cassandra.db; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -28,6 +28,11 @@ import org.apache.cassandra.tracing.Tracing; public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> { + protected IVersionedSerializer<ReadResponse> serializer() + { + return ReadResponse.serializer; + } + public void doVerb(MessageIn<ReadCommand> message, int id) { if (StorageService.instance.isBootstrapMode()) @@ -42,7 +47,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> response = command.createResponse(iterator); } - MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, ReadResponse.serializer); + MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer()); Tracing.trace("Enqueuing response to {}", message.from); MessagingService.instance().sendReply(reply, id, message.from); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 90bd21d..b3cc725 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -20,8 +20,12 @@ package org.apache.cassandra.db; import java.io.*; import java.nio.ByteBuffer; import java.security.MessageDigest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.IVersionedSerializer; @@ -30,6 +34,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -57,8 +62,10 @@ public abstract class ReadResponse return new DigestResponse(makeDigest(data)); } - public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata); - public abstract ByteBuffer digest(CFMetaData metadata); + public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command); + + public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command); + public abstract boolean isDigestQuery(); protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator) @@ -79,12 +86,12 @@ public abstract class ReadResponse this.digest = digest; } - public UnfilteredPartitionIterator makeIterator(CFMetaData metadata) + public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command) { throw new UnsupportedOperationException(); } - public ByteBuffer digest(CFMetaData metadata) + public ByteBuffer digest(CFMetaData metadata, ReadCommand command) { return digest; } @@ -124,7 +131,7 @@ public abstract class ReadResponse } } - public UnfilteredPartitionIterator makeIterator(CFMetaData metadata) + public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command) { try { @@ -138,9 +145,76 @@ public abstract class ReadResponse } } - public ByteBuffer digest(CFMetaData metadata) + public ByteBuffer digest(CFMetaData metadata, ReadCommand command) { - try (UnfilteredPartitionIterator iterator = makeIterator(metadata)) + try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) + { + return makeDigest(iterator); + } + } + + public boolean isDigestQuery() + { + return false; + } + } + + /** + * A remote response from a pre-3.0 node. This needs a separate class in order to cleanly handle trimming and + * reversal of results when the read command calls for it. Pre-3.0 nodes always return results in the normal + * sorted order, even if the query asks for reversed results. Additionally, pre-3.0 nodes do not have a notion of + * exclusive slices on non-composite tables, so extra rows may need to be trimmed. + */ + private static class LegacyRemoteDataResponse extends ReadResponse + { + private final List<ArrayBackedPartition> partitions; + + private LegacyRemoteDataResponse(List<ArrayBackedPartition> partitions) + { + super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata + this.partitions = partitions; + } + + public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command) + { + return new AbstractUnfilteredPartitionIterator() + { + private int idx; + + public boolean isForThrift() + { + return true; + } + + public CFMetaData metadata() + { + return metadata; + } + + public boolean hasNext() + { + return idx < partitions.size(); + } + + public UnfilteredRowIterator next() + { + ArrayBackedPartition partition = partitions.get(idx++); + + ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey()); + + // Pre-3.0, we didn't have a way to express exclusivity for non-composite comparators, so all slices were + // inclusive on both ends. If we have exclusive slice ends, we need to filter the results here. + if (!command.metadata().isCompound()) + return filter.filter(partition.sliceableUnfilteredIterator(command.columnFilter(), filter.isReversed())); + + return partition.unfilteredIterator(command.columnFilter(), Slices.ALL, filter.isReversed()); + } + }; + } + + public ByteBuffer digest(CFMetaData metadata, ReadCommand command) + { + try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) { return makeDigest(iterator); } @@ -156,14 +230,32 @@ public abstract class ReadResponse { public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException { + boolean isDigest = response instanceof DigestResponse; + ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (version < MessagingService.VERSION_30) { - // TODO - throw new UnsupportedOperationException(); + out.writeInt(digest.remaining()); + out.write(digest); + out.writeBoolean(isDigest); + if (!isDigest) + { + assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side + try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null)) + { + assert iter.hasNext(); + try (UnfilteredRowIterator partition = iter.next()) + { + ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); + LegacyLayout.serializeAsLegacyPartition(partition, out, version); + } + assert !iter.hasNext(); + } + } + return; } - boolean isDigest = response.isDigestQuery(); - ByteBufferUtil.writeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER, out); + ByteBufferUtil.writeWithVIntLength(digest, out); if (!isDigest) { // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the @@ -178,8 +270,35 @@ public abstract class ReadResponse { if (version < MessagingService.VERSION_30) { - // TODO - throw new UnsupportedOperationException(); + byte[] digest = null; + int digestSize = in.readInt(); + if (digestSize > 0) + { + digest = new byte[digestSize]; + in.readFully(digest, 0, digestSize); + } + boolean isDigest = in.readBoolean(); + assert isDigest == digestSize > 0; + if (isDigest) + { + assert digest != null; + return new DigestResponse(ByteBuffer.wrap(digest)); + } + + // ReadResponses from older versions are always single-partition (ranges are handled by RangeSliceReply) + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + UnfilteredRowIterator rowIterator = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key); + if (rowIterator == null) + return new LegacyRemoteDataResponse(Collections.emptyList()); + + try + { + return new LegacyRemoteDataResponse(Collections.singletonList(ArrayBackedPartition.create(rowIterator))); + } + finally + { + rowIterator.close(); + } } ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in); @@ -193,14 +312,32 @@ public abstract class ReadResponse public long serializedSize(ReadResponse response, int version) { + boolean isDigest = response instanceof DigestResponse; + ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (version < MessagingService.VERSION_30) { - // TODO - throw new UnsupportedOperationException(); + long size = TypeSizes.sizeof(digest.remaining()) + + digest.remaining() + + TypeSizes.sizeof(isDigest); + if (!isDigest) + { + assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side + try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null)) + { + assert iter.hasNext(); + try (UnfilteredRowIterator partition = iter.next()) + { + size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); + size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version); + } + assert !iter.hasNext(); + } + } + return size; } - boolean isDigest = response.isDigestQuery(); - long size = ByteBufferUtil.serializedSizeWithVIntLength(isDigest ? response.digest(response.metadata) : ByteBufferUtil.EMPTY_BYTE_BUFFER); + long size = ByteBufferUtil.serializedSizeWithVIntLength(digest); if (!isDigest) { // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the @@ -217,32 +354,75 @@ public abstract class ReadResponse { public void serialize(ReadResponse response, DataOutputPlus out, int version) throws IOException { - // TODO - throw new UnsupportedOperationException(); - // out.writeInt(rsr.rows.size()); - // for (Row row : rsr.rows) - // Row.serializer.serialize(row, out, version); + assert version < MessagingService.VERSION_30; + + // determine the number of partitions upfront for serialization + int numPartitions = 0; + assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + { + while (iterator.hasNext()) + { + try (UnfilteredRowIterator atomIterator = iterator.next()) + { + numPartitions++; + + // we have to fully exhaust the subiterator + while (atomIterator.hasNext()) + atomIterator.next(); + } + } + } + + out.writeInt(numPartitions); + + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + { + while (iterator.hasNext()) + { + try (UnfilteredRowIterator partition = iterator.next()) + { + ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); + LegacyLayout.serializeAsLegacyPartition(partition, out, version); + } + } + } } public ReadResponse deserialize(DataInputPlus in, int version) throws IOException { - // TODO - throw new UnsupportedOperationException(); - // int rowCount = in.readInt(); - // List<Row> rows = new ArrayList<Row>(rowCount); - // for (int i = 0; i < rowCount; i++) - // rows.add(Row.serializer.deserialize(in, version)); - // return new RangeSliceReply(rows); + // Contrarily to serialize, we have to read the number of serialized partitions here. + int partitionCount = in.readInt(); + ArrayList<ArrayBackedPartition> partitions = new ArrayList<>(partitionCount); + for (int i = 0; i < partitionCount; i++) + { + ByteBuffer key = ByteBufferUtil.readWithShortLength(in); + try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key)) + { + partitions.add(ArrayBackedPartition.create(partition)); + } + } + return new LegacyRemoteDataResponse(partitions); } public long serializedSize(ReadResponse response, int version) { - // TODO - throw new UnsupportedOperationException(); - // int size = TypeSizes.sizeof(rsr.rows.size()); - // for (Row row : rsr.rows) - // size += Row.serializer.serializedSize(row, version); - // return size; + assert version < MessagingService.VERSION_30; + long size = TypeSizes.sizeof(0); // number of partitions + + assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + { + while (iterator.hasNext()) + { + try (UnfilteredRowIterator partition = iterator.next()) + { + size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); + size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version); + } + } + } + return size; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index bb184e8..1b688c9 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -32,6 +32,8 @@ import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.*; import org.apache.cassandra.service.pager.*; import org.apache.cassandra.tracing.Tracing; @@ -257,7 +259,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp) { assert !cfs.isIndex(); // CASSANDRA-5732 - assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [" + cfs.name + "]"); + assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name); UUID cfId = metadata().cfId; RowCacheKey key = new RowCacheKey(cfId, partitionKey()); @@ -393,6 +395,11 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter nowInSec()); } + protected MessageOut<ReadCommand> createLegacyMessage() + { + return new MessageOut<>(MessagingService.Verb.READ, this, legacyReadCommandSerializer); + } + protected void appendCQLWhereClause(StringBuilder sb) { sb.append(" WHERE "); @@ -509,5 +516,5 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter else return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter); } - }; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/Slice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java index 2ffb91e..7fde45e 100644 --- a/src/java/org/apache/cassandra/db/Slice.java +++ b/src/java/org/apache/cassandra/db/Slice.java @@ -19,15 +19,12 @@ package org.apache.cassandra.db; import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; /** * A slice represents the selection of a range of rows. @@ -83,11 +80,10 @@ public class Slice public static Slice make(ClusteringComparator comparator, Object... values) { CBuilder builder = CBuilder.create(comparator); - for (int i = 0; i < values.length; i++) + for (Object val : values) { - Object val = values[i]; if (val instanceof ByteBuffer) - builder.add((ByteBuffer)val); + builder.add((ByteBuffer) val); else builder.add(val); } @@ -208,6 +204,9 @@ public class Slice */ public Slice forPaging(ClusteringComparator comparator, Clustering lastReturned, boolean inclusive, boolean reversed) { + if (lastReturned == null) + return this; + if (reversed) { int cmp = comparator.compare(lastReturned, start); @@ -286,14 +285,14 @@ public class Slice for (int i = 0; i < start.size(); i++) { if (i > 0) - sb.append(":"); + sb.append(':'); sb.append(comparator.subtype(i).getString(start.get(i))); } sb.append(", "); for (int i = 0; i < end.size(); i++) { if (i > 0) - sb.append(":"); + sb.append(':'); sb.append(comparator.subtype(i).getString(end.get(i))); } sb.append(end.isInclusive() ? "]" : ")"); @@ -394,14 +393,37 @@ public class Slice return create(Kind.EXCL_END_BOUND, values); } + public static Bound inclusiveStartOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return inclusiveStartOf(values); + } + + public static Bound exclusiveStartOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return exclusiveStartOf(values); + } + + public static Bound inclusiveEndOf(ClusteringPrefix prefix) + { + ByteBuffer[] values = new ByteBuffer[prefix.size()]; + for (int i = 0; i < prefix.size(); i++) + values[i] = prefix.get(i); + return inclusiveEndOf(values); + } + public static Bound create(ClusteringComparator comparator, boolean isStart, boolean isInclusive, Object... values) { CBuilder builder = CBuilder.create(comparator); - for (int i = 0; i < values.length; i++) + for (Object val : values) { - Object val = values[i]; if (val instanceof ByteBuffer) - builder.add((ByteBuffer)val); + builder.add((ByteBuffer) val); else builder.add(val); } @@ -483,14 +505,14 @@ public class Slice public String toString(ClusteringComparator comparator) { StringBuilder sb = new StringBuilder(); - sb.append(kind()).append("("); + sb.append(kind()).append('('); for (int i = 0; i < size(); i++) { if (i > 0) sb.append(", "); sb.append(comparator.subtype(i).getString(get(i))); } - return sb.append(")").toString(); + return sb.append(')').toString(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java index ed7584b..51e9d8e 100644 --- a/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java +++ b/src/java/org/apache/cassandra/db/filter/AbstractClusteringIndexFilter.java @@ -28,23 +28,8 @@ import org.apache.cassandra.io.util.DataOutputPlus; public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFilter { - protected enum Kind - { - SLICE (ClusteringIndexSliceFilter.deserializer), - NAMES (ClusteringIndexNamesFilter.deserializer); - - private final InternalDeserializer deserializer; - - private Kind(InternalDeserializer deserializer) - { - this.deserializer = deserializer; - } - } - static final Serializer serializer = new FilterSerializer(); - abstract Kind kind(); - protected final boolean reversed; protected AbstractClusteringIndexFilter(boolean reversed) @@ -101,9 +86,4 @@ public abstract class AbstractClusteringIndexFilter implements ClusteringIndexFi + filter.serializedSizeInternal(version); } } - - protected static abstract class InternalDeserializer - { - public abstract ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java index 33a0917..e3f824f 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java @@ -39,6 +39,24 @@ public interface ClusteringIndexFilter { public static Serializer serializer = AbstractClusteringIndexFilter.serializer; + public enum Kind + { + SLICE (ClusteringIndexSliceFilter.deserializer), + NAMES (ClusteringIndexNamesFilter.deserializer); + + protected final InternalDeserializer deserializer; + + private Kind(InternalDeserializer deserializer) + { + this.deserializer = deserializer; + } + } + + static interface InternalDeserializer + { + public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException; + } + /** * Whether the filter query rows in reversed clustering order or not. * @@ -140,6 +158,8 @@ public interface ClusteringIndexFilter */ public boolean shouldInclude(SSTableReader sstable); + public Kind kind(); + public String toString(CFMetaData metadata); public String toCQLString(CFMetaData metadata); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java index f2c81a7..e0bc533 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java @@ -232,7 +232,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter return sb.toString(); } - Kind kind() + public Kind kind() { return Kind.NAMES; } @@ -254,7 +254,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter return size; } - private static class NamesDeserializer extends InternalDeserializer + private static class NamesDeserializer implements InternalDeserializer { public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java index 4f0e4e2..b2d529c 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java @@ -153,7 +153,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter return sb.toString(); } - Kind kind() + public Kind kind() { return Kind.SLICE; } @@ -168,7 +168,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter return Slices.serializer.serializedSize(slices, version); } - private static class SliceDeserializer extends InternalDeserializer + private static class SliceDeserializer implements InternalDeserializer { public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index d2cb87d..2afc785 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -313,6 +313,9 @@ public class ColumnFilter return ""; Iterator<ColumnDefinition> defs = selection.selectOrderIterator(); + if (!defs.hasNext()) + return "<none>"; + StringBuilder sb = new StringBuilder(); appendColumnDef(sb, defs.next()); while (defs.hasNext()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8c64cefd/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 458ee30..3e608b4 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -62,7 +62,7 @@ public abstract class DataLimits // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering. public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true); - private enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT } + public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT } public static DataLimits cqlLimits(int cqlRowLimit) { @@ -89,7 +89,7 @@ public abstract class DataLimits return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); } - protected abstract Kind kind(); + public abstract Kind kind(); public abstract boolean isUnlimited(); @@ -199,7 +199,7 @@ public abstract class DataLimits return new CQLLimits(rowLimit, 1, true); } - protected Kind kind() + public Kind kind() { return Kind.CQL_LIMIT; } @@ -368,7 +368,7 @@ public abstract class DataLimits } @Override - protected Kind kind() + public Kind kind() { return Kind.CQL_PAGING_LIMIT; } @@ -432,7 +432,7 @@ public abstract class DataLimits this.cellPerPartitionLimit = cellPerPartitionLimit; } - protected Kind kind() + public Kind kind() { return Kind.THRIFT_LIMIT; } @@ -588,7 +588,7 @@ public abstract class DataLimits super(partitionLimit, cellPerPartitionLimit); } - protected Kind kind() + public Kind kind() { return Kind.SUPER_COLUMN_COUNTING_LIMIT; }
