Sends the proper amount of cells to old nodes on DISTINCT patch by slebresne; reviewed by blerer for CASSANDRA-10762
On a DISTINCT query, 3.0 nodes were sending the 1 row back, but pre-3.0 nodes actually expect only the 1st cell and limits get thrown off if they get more. This could actually be a problem for thrift queries (on CQL tables only) when the limit ended up in the middle of a row. The patch fixes this by enforcing the cell limit while serializing the response to old nodes. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e37b4a9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e37b4a9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e37b4a9 Branch: refs/heads/trunk Commit: 3e37b4a90d4e5a036f24ac3d9a3aa804df6e6969 Parents: eb12770 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Wed Jan 20 17:31:10 2016 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Wed Jan 27 15:45:27 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/LegacyLayout.java | 53 ++++++++- .../org/apache/cassandra/db/ReadCommand.java | 54 +++++---- .../cassandra/db/ReadCommandVerbHandler.java | 2 +- .../org/apache/cassandra/db/ReadResponse.java | 110 ++++++++----------- .../apache/cassandra/db/filter/DataLimits.java | 16 ++- .../db/partitions/PartitionUpdate.java | 4 +- .../UnfilteredPartitionIterators.java | 6 +- .../db/rows/UnfilteredRowIterators.java | 6 +- .../org/apache/cassandra/repair/Validator.java | 2 +- .../apache/cassandra/service/DataResolver.java | 6 +- .../cassandra/service/DigestResolver.java | 6 +- .../apache/cassandra/service/StorageProxy.java | 2 +- .../cassandra/cache/CacheProviderTest.java | 4 +- .../org/apache/cassandra/db/PartitionTest.java | 18 +-- .../apache/cassandra/db/ReadResponseTest.java | 10 +- .../db/SinglePartitionSliceCommandTest.java | 14 +-- .../rows/DigestBackwardCompatibilityTest.java | 5 +- .../cassandra/service/DataResolverTest.java | 2 +- 19 files changed, 191 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c99438f..8daeb2d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.3 + * Fix DISTINCT queries in mixed version clusters (CASSANDRA-10762) * Migrate build status for indexes along with legacy schema (CASSANDRA-11046) * Ensure SSTables for legacy KEYS indexes can be read (CASSANDRA-11045) * Added support for IBM zSystems architecture (CASSANDRA-11054) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 6121227..b90151e 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -32,6 +32,7 @@ import com.google.common.collect.PeekingIterator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.context.CounterContext; @@ -318,8 +319,46 @@ public abstract class LegacyLayout return CompositeType.build(values); } + /** + * The maximum number of cells to include per partition when converting to the old format. + * <p> + * We already apply the limit during the actual query, but for queries that counts cells and not rows (thrift queries + * and distinct queries as far as old nodes are concerned), we may still include a little bit more than requested + * because {@link DataLimits} always include full rows. So if the limit ends in the middle of a queried row, the + * full row will be part of our result. This would confuse old nodes however so we make sure to truncate it to + * what's expected before writting it on the wire. + * + * @param command the read commmand for which to determine the maximum cells per partition. This can be {@code null} + * in which case {@code Integer.MAX_VALUE} is returned. + * @return the maximum number of cells per partition that should be enforced according to the read command if + * post-query limitation are in order (see above). This will be {@code Integer.MAX_VALUE} if no such limits are + * necessary. + */ + private static int maxCellsPerPartition(ReadCommand command) + { + if (command == null) + return Integer.MAX_VALUE; + + DataLimits limits = command.limits(); + + // There is 2 types of DISTINCT queries: those that includes only the partition key, and those that include static columns. + // On old nodes, the latter expects the first row in term of CQL count, which is what we already have and there is no additional + // limit to apply. The former however expect only one cell per partition and rely on it (See CASSANDRA-10762). + if (limits.isDistinct()) + return command.columnFilter().fetchedColumns().statics.isEmpty() ? 1 : Integer.MAX_VALUE; + + switch (limits.kind()) + { + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + return limits.perPartitionCount(); + default: + return Integer.MAX_VALUE; + } + } + // For serializing to old wire format - public static LegacyUnfilteredPartition fromUnfilteredRowIterator(UnfilteredRowIterator iterator) + public static LegacyUnfilteredPartition fromUnfilteredRowIterator(ReadCommand command, UnfilteredRowIterator iterator) { // we need to extract the range tombstone so materialize the partition. Since this is // used for the on-wire format, this is not worst than it used to be. @@ -333,6 +372,10 @@ public abstract class LegacyLayout // before we use the LegacyRangeTombstoneList at all List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right); + int maxCellsPerPartition = maxCellsPerPartition(command); + if (cells.size() > maxCellsPerPartition) + cells = cells.subList(0, maxCellsPerPartition); + // The LegacyRangeTombstoneList already has range tombstones for the single-row deletions and complex // deletions. Go through our normal range tombstones and add then to the LegacyRTL so that the range // tombstones all get merged and sorted properly. @@ -352,13 +395,13 @@ public abstract class LegacyLayout return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, cells); } - public static void serializeAsLegacyPartition(UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException + public static void serializeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, DataOutputPlus out, int version) throws IOException { assert version < MessagingService.VERSION_30; out.writeBoolean(true); - LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition); + LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition); UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, version); DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, out); @@ -420,7 +463,7 @@ public abstract class LegacyLayout } // For the old wire format - public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator partition, int version) + public static long serializedSizeAsLegacyPartition(ReadCommand command, UnfilteredRowIterator partition, int version) { assert version < MessagingService.VERSION_30; @@ -429,7 +472,7 @@ public abstract class LegacyLayout long size = TypeSizes.sizeof(true); - LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(partition); + LegacyLayout.LegacyUnfilteredPartition legacyPartition = LegacyLayout.fromUnfilteredRowIterator(command, partition); size += UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version); size += DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 668a189..f21d100 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -282,11 +282,11 @@ public abstract class ReadCommand implements ReadQuery protected abstract int oldestUnrepairedTombstone(); - public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection) + public ReadResponse createResponse(UnfilteredPartitionIterator iterator) { return isDigestQuery() - ? ReadResponse.createDigestResponse(iterator, digestVersion) - : ReadResponse.createDataResponse(iterator, selection); + ? ReadResponse.createDigestResponse(iterator, this) + : ReadResponse.createDataResponse(iterator, this); } public long indexSerializedSize(int version) @@ -723,18 +723,17 @@ public abstract class ReadCommand implements ReadQuery out.writeBoolean(filter.isReversed()); // limit - DataLimits.Kind kind = rangeCommand.limits().kind(); - boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() == 1; - if (isDistinct) + DataLimits limits = rangeCommand.limits(); + if (limits.isDistinct()) out.writeInt(1); else out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(), filter.requestedSlices())); int compositesToGroup; boolean selectsStatics = !rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING); - if (kind == DataLimits.Kind.THRIFT_LIMIT) + if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT) compositesToGroup = -1; - else if (isDistinct && !selectsStatics) + else if (limits.isDistinct() && !selectsStatics) compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) else compositesToGroup = metadata.isDense() ? -1 : metadata.clusteringColumns().size(); @@ -799,11 +798,15 @@ public abstract class ReadCommand implements ReadQuery AbstractBounds<PartitionPosition> keyRange = AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, version); int maxResults = in.readInt(); - in.readBoolean(); // countCQL3Rows (not needed) + boolean countCQL3Rows = in.readBoolean(); // countCQL3Rows (not needed) in.readBoolean(); // isPaging (not needed) boolean selectsStatics = (!selection.fetchedColumns().statics.isEmpty() || filter.selects(Clustering.STATIC_CLUSTERING)); - boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, + // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less + // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use + // that fact. + boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); DataLimits limits; if (isDistinct) limits = DataLimits.distinctLimits(maxResults); @@ -1054,9 +1057,13 @@ public abstract class ReadCommand implements ReadQuery RowFilter rowFilter = LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata); int maxResults = in.readInt(); - in.readBoolean(); // countCQL3Rows + boolean countCQL3Rows = in.readBoolean(); - boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit == 1 && selectsStatics); + // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, + // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter one is slightly less + // direct, but we know that on 2.1/2.2 queries, DISTINCT queries are the only CQL queries that have countCQL3Rows to false so we use + // that fact. + boolean isDistinct = compositesToGroup == -2 || (compositesToGroup != -1 && !countCQL3Rows); DataLimits limits; if (isDistinct) limits = DataLimits.distinctLimits(maxResults); @@ -1340,17 +1347,16 @@ public abstract class ReadCommand implements ReadQuery out.writeBoolean(filter.isReversed()); boolean selectsStatics = !command.columnFilter().fetchedColumns().statics.isEmpty() || slices.selects(Clustering.STATIC_CLUSTERING); - DataLimits.Kind kind = command.limits().kind(); - boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1; - if (isDistinct) + DataLimits limits = command.limits(); + if (limits.isDistinct()) out.writeInt(1); // the limit is always 1 for DISTINCT queries else out.writeInt(updateLimitForQuery(command.limits().count(), filter.requestedSlices())); int compositesToGroup; - if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense()) + if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense()) compositesToGroup = -1; - else if (isDistinct && !selectsStatics) + else if (limits.isDistinct() && !selectsStatics) compositesToGroup = -2; // for DISTINCT queries (CASSANDRA-8490) else compositesToGroup = metadata.clusteringColumns().size(); @@ -1369,9 +1375,19 @@ public abstract class ReadCommand implements ReadQuery // if a slice query from a pre-3.0 node doesn't cover statics, we shouldn't select them at all ColumnFilter columnFilter = LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, compositesToGroup, metadata); - boolean isDistinct = compositesToGroup == -2 || (count == 1 && selectsStatics); + // We have 2 types of DISTINCT queries: ones on only the partition key, and ones on the partition key and static columns. For the former, + // we can easily detect the case because compositeToGroup is -2 and that's the only case it can be that. The latter is probablematic + // however as we have no way to distinguish it from a normal select with a limit of 1 (and this, contrarily to the range query case + // were the countCQL3Rows boolean allows us to decide). + // So we consider this case not distinct here. This is ok because even if it is a distinct (with static), the count will be 1 and + // we'll still just query one row (a distinct DataLimits currently behave exactly like a CQL limit with a count of 1). The only + // drawback is that we'll send back the first row entirely while a 2.1/2.2 node would return only the first cell in that same + // situation. This isn't a problem for 2.1/2.2 code however (it would be for a range query, as it would throw off the count for + // reasons similar to CASSANDRA-10762, but it's ok for single partition queries). + // We do _not_ want to do the reverse however and consider a 'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make + // us only return the 1st cell rather then 1st row. DataLimits limits; - if (compositesToGroup == -2 || isDistinct) + if (compositesToGroup == -2) limits = DataLimits.distinctLimits(count); // See CASSANDRA-8490 for the explanation of this value else if (compositesToGroup == -1) limits = DataLimits.thriftLimits(1, count); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 72a6fa8..9cde8dc 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> ReadResponse response; try (ReadOrderGroup opGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(opGroup)) { - response = command.createResponse(iterator, command.columnFilter()); + response = command.createResponse(iterator); } MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 41f0d5d..a618aa5 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -53,38 +53,38 @@ public abstract class ReadResponse // This is used only when serializing data responses and we can't it easily in other cases. So this can be null, which is slighly // hacky, but as this hack doesn't escape this class, and it's easy enough to validate that it's not null when we need, it's "good enough". - private final CFMetaData metadata; + private final ReadCommand command; - protected ReadResponse(CFMetaData metadata) + protected ReadResponse(ReadCommand command) { - this.metadata = metadata; + this.command = command; } - public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) + public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command) { - return new LocalDataResponse(data, selection); + return new LocalDataResponse(data, command); } @VisibleForTesting - public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) + public static ReadResponse createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command) { - return new RemoteDataResponse(LocalDataResponse.build(data, selection)); + return new RemoteDataResponse(LocalDataResponse.build(data, command.columnFilter())); } - public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, int version) + public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command) { - return new DigestResponse(makeDigest(data, version)); + return new DigestResponse(makeDigest(data, command)); } - public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command); - public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command); + public abstract UnfilteredPartitionIterator makeIterator(ReadCommand command); + public abstract ByteBuffer digest(ReadCommand command); public abstract boolean isDigestResponse(); - protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, int version) + protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, ReadCommand command) { MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - UnfilteredPartitionIterators.digest(iterator, digest, version); + UnfilteredPartitionIterators.digest(command, iterator, digest, command.digestVersion()); return ByteBuffer.wrap(digest.digest()); } @@ -99,12 +99,12 @@ public abstract class ReadResponse this.digest = digest; } - public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command) + public UnfilteredPartitionIterator makeIterator(ReadCommand command) { throw new UnsupportedOperationException(); } - public ByteBuffer digest(CFMetaData metadata, ReadCommand command) + public ByteBuffer digest(ReadCommand command) { // We assume that the digest is in the proper version, which bug excluded should be true since this is called with // ReadCommand.digestVersion() as argument and that's also what we use to produce the digest in the first place. @@ -122,11 +122,9 @@ public abstract class ReadResponse // built on the owning node responding to a query private static class LocalDataResponse extends DataResponse { - private final ColumnFilter received; - private LocalDataResponse(UnfilteredPartitionIterator iter, ColumnFilter received) + private LocalDataResponse(UnfilteredPartitionIterator iter, ReadCommand command) { - super(iter.metadata(), build(iter, received), SerializationHelper.Flag.LOCAL); - this.received = received; + super(command, build(iter, command.columnFilter()), SerializationHelper.Flag.LOCAL); } private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) @@ -142,14 +140,6 @@ public abstract class ReadResponse throw new RuntimeException(e); } } - - protected ColumnFilter selection(ReadCommand sent) - { - // we didn't send anything, so we don't provide it in the serializer methods, but use the - // object's reference to the original column filter we received - assert sent == null || sent.columnFilter() == received; - return received; - } } // built on the coordinator node receiving a response @@ -159,13 +149,6 @@ public abstract class ReadResponse { super(null, data, SerializationHelper.Flag.FROM_REMOTE); } - - protected ColumnFilter selection(ReadCommand sent) - { - // we should always know what we sent, and should provide it in digest() and makeIterator() - assert sent != null; - return sent.columnFilter(); - } } static abstract class DataResponse extends ReadResponse @@ -175,23 +158,24 @@ public abstract class ReadResponse private final ByteBuffer data; private final SerializationHelper.Flag flag; - protected DataResponse(CFMetaData metadata, ByteBuffer data, SerializationHelper.Flag flag) + protected DataResponse(ReadCommand command, ByteBuffer data, SerializationHelper.Flag flag) { - super(metadata); + super(command); this.data = data; this.flag = flag; } - protected abstract ColumnFilter selection(ReadCommand command); - - public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command) + public UnfilteredPartitionIterator makeIterator(ReadCommand command) { try (DataInputBuffer in = new DataInputBuffer(data, true)) { + // Note that the command parameter shadows the 'command' field and this is intended because + // the later can be null (for RemoteDataResponse as those are created in the serializers and + // those don't have easy access to the command). This is also why we need the command as parameter here. return UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in, MessagingService.current_version, - metadata, - selection(command), + command.metadata(), + command.columnFilter(), flag); } catch (IOException e) @@ -201,11 +185,11 @@ public abstract class ReadResponse } } - public ByteBuffer digest(CFMetaData metadata, ReadCommand command) + public ByteBuffer digest(ReadCommand command) { - try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) + try (UnfilteredPartitionIterator iterator = makeIterator(command)) { - return makeDigest(iterator, command.digestVersion()); + return makeDigest(iterator, command); } } @@ -229,11 +213,11 @@ public abstract class ReadResponse @VisibleForTesting LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions) { - super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata + super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the command this.partitions = partitions; } - public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, final ReadCommand command) + public UnfilteredPartitionIterator makeIterator(final ReadCommand command) { // Due to a bug in the serialization of AbstractBounds, anything that isn't a Range is understood by pre-3.0 nodes // as a Bound, which means IncludingExcludingBounds and ExcludingBounds responses may include keys they shouldn't. @@ -271,7 +255,7 @@ public abstract class ReadResponse public CFMetaData metadata() { - return metadata; + return command.metadata(); } public boolean hasNext() @@ -296,11 +280,11 @@ public abstract class ReadResponse }; } - public ByteBuffer digest(CFMetaData metadata, ReadCommand command) + public ByteBuffer digest(ReadCommand command) { - try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) + try (UnfilteredPartitionIterator iterator = makeIterator(command)) { - return makeDigest(iterator, command.digestVersion()); + return makeDigest(iterator, command); } } @@ -323,14 +307,14 @@ public abstract class ReadResponse out.writeBoolean(isDigest); if (!isDigest) { - assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side - try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null)) + assert response.command != null; // we only serialize LocalDataResponse, which always has the command set + try (UnfilteredPartitionIterator iter = response.makeIterator(response.command)) { assert iter.hasNext(); try (UnfilteredRowIterator partition = iter.next()) { ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); - LegacyLayout.serializeAsLegacyPartition(partition, out, version); + LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version); } assert !iter.hasNext(); } @@ -397,14 +381,14 @@ public abstract class ReadResponse + TypeSizes.sizeof(isDigest); if (!isDigest) { - assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side - try (UnfilteredPartitionIterator iter = response.makeIterator(response.metadata, null)) + assert response.command != null; // we only serialize LocalDataResponse, which always has the command set + try (UnfilteredPartitionIterator iter = response.makeIterator(response.command)) { assert iter.hasNext(); try (UnfilteredRowIterator partition = iter.next()) { size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); - size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version); + size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version); } assert !iter.hasNext(); } @@ -458,8 +442,8 @@ public abstract class ReadResponse // determine the number of partitions upfront for serialization int numPartitions = 0; - assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + assert response.command != null; // we only serialize LocalDataResponse, which always has the command set + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) { while (iterator.hasNext()) { @@ -476,14 +460,14 @@ public abstract class ReadResponse out.writeInt(numPartitions); - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) { while (iterator.hasNext()) { try (UnfilteredRowIterator partition = iterator.next()) { ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out); - LegacyLayout.serializeAsLegacyPartition(partition, out, version); + LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, version); } } } @@ -509,15 +493,15 @@ public abstract class ReadResponse assert version < MessagingService.VERSION_30; long size = TypeSizes.sizeof(0); // number of partitions - assert !(response instanceof LegacyRemoteDataResponse); // we only use those on the receiving side - try (UnfilteredPartitionIterator iterator = response.makeIterator(response.metadata, null)) + assert response.command != null; // we only serialize LocalDataResponse, which always has the command set + try (UnfilteredPartitionIterator iterator = response.makeIterator(response.command)) { while (iterator.hasNext()) { try (UnfilteredRowIterator partition = iterator.next()) { size += ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey()); - size += LegacyLayout.serializedSizeAsLegacyPartition(partition, version); + size += LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 19f24ad..f6fdcdd 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -98,6 +98,7 @@ public abstract class DataLimits public abstract Kind kind(); public abstract boolean isUnlimited(); + public abstract boolean isDistinct(); public abstract DataLimits forPaging(int pageSize); public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining); @@ -232,8 +233,7 @@ public abstract class DataLimits protected final int rowLimit; protected final int perPartitionLimit; - // Whether the query is a distinct query or not. This is currently not used by the code but prior experience - // shows that keeping the information around is wise and might be useful in the future. + // Whether the query is a distinct query or not. protected final boolean isDistinct; private CQLLimits(int rowLimit) @@ -268,9 +268,14 @@ public abstract class DataLimits return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT; } + public boolean isDistinct() + { + return isDistinct; + } + public DataLimits forPaging(int pageSize) { - return new CQLLimits(pageSize, perPartitionLimit); + return new CQLLimits(pageSize, perPartitionLimit, isDistinct); } public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) @@ -513,6 +518,11 @@ public abstract class DataLimits return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT; } + public boolean isDistinct() + { + return false; + } + public DataLimits forPaging(int pageSize) { // We don't support paging on thrift in general but do use paging under the hood for get_count. For http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index f10b3b6..6331440 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -613,7 +613,7 @@ public class PartitionUpdate extends AbstractBTreePartition if (version < MessagingService.VERSION_30) { - LegacyLayout.serializeAsLegacyPartition(iter, out, version); + LegacyLayout.serializeAsLegacyPartition(null, iter, out, version); } else { @@ -699,7 +699,7 @@ public class PartitionUpdate extends AbstractBTreePartition try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) { if (version < MessagingService.VERSION_30) - return LegacyLayout.serializedSizeAsLegacyPartition(iter, version); + return LegacyLayout.serializedSizeAsLegacyPartition(null, iter, version); return CFMetaData.serializer.serializedSize(update.metadata(), version) + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index a3f7981..41b1424 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -238,11 +238,13 @@ public abstract class UnfilteredPartitionIterators /** * Digests the the provided iterator. * + * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30} + * as this is only used when producing digest to be sent to legacy nodes. * @param iterator the iterator to digest. * @param digest the {@code MessageDigest} to use for the digest. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version) + public static void digest(ReadCommand command, UnfilteredPartitionIterator iterator, MessageDigest digest, int version) { try (UnfilteredPartitionIterator iter = iterator) { @@ -250,7 +252,7 @@ public abstract class UnfilteredPartitionIterators { try (UnfilteredRowIterator partition = iter.next()) { - UnfilteredRowIterators.digest(partition, digest, version); + UnfilteredRowIterators.digest(command, partition, digest, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index ea929d7..9416896 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -102,15 +102,17 @@ public abstract class UnfilteredRowIterators /** * Digests the partition represented by the provided iterator. * + * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30} + * as this is only used when producing digest to be sent to legacy nodes. * @param iterator the iterator to digest. * @param digest the {@code MessageDigest} to use for the digest. * @param version the messaging protocol to use when producing the digest. */ - public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version) + public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version) { if (version < MessagingService.VERSION_30) { - LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), digest); + LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest); return; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 3db9761..217c9de 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -211,7 +211,7 @@ public class Validator implements Runnable validated++; // MerkleTree uses XOR internally, so we want lots of output bits here CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256")); - UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version); + UnfilteredRowIterators.digest(null, partition, digest, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 return digest.count > 0 ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index f3858d7..1fe931f 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -50,7 +50,7 @@ public class DataResolver extends ResponseResolver public PartitionIterator getData() { ReadResponse response = responses.iterator().next().payload; - return UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), command), command.nowInSec()); + return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); } public PartitionIterator resolve() @@ -63,7 +63,7 @@ public class DataResolver extends ResponseResolver for (int i = 0; i < count; i++) { MessageIn<ReadResponse> msg = responses.get(i); - iters.add(msg.payload.makeIterator(command.metadata(), command)); + iters.add(msg.payload.makeIterator(command)); sources[i] = msg.from; } @@ -385,7 +385,7 @@ public class DataResolver extends ResponseResolver // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); assert resolver.responses.size() == 1; - return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(), command), retryCommand); + return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java index 62b4538..4a918a3 100644 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver public PartitionIterator getData() { assert isDataPresent(); - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); } /* @@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver { ReadResponse response = message.payload; - ByteBuffer newDigest = response.digest(command.metadata(), command); + ByteBuffer newDigest = response.digest(command); if (digest == null) digest = newDigest; else if (!digest.equals(newDigest)) @@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver if (logger.isTraceEnabled()) logger.trace("resolve: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); - return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(), command), command.nowInSec()); + return UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), command.nowInSec()); } public boolean isDataPresent() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 89ac0bb..8fa2082 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1779,7 +1779,7 @@ public class StorageProxy implements StorageProxyMBean { try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup)) { - handler.response(command.createResponse(iterator, command.columnFilter())); + handler.response(command.createResponse(iterator)); } MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index cd52d35..a4173d6 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -103,8 +103,8 @@ public class CacheProviderTest { MessageDigest d1 = MessageDigest.getInstance("MD5"); MessageDigest d2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version); - UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version); + UnfilteredRowIterators.digest(null, ((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version); + UnfilteredRowIterators.digest(null, ((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version); assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest())); } catch (NoSuchAlgorithmException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/PartitionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java index 623ff0e..7216ab7 100644 --- a/test/unit/org/apache/cassandra/db/PartitionTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionTest.java @@ -138,21 +138,23 @@ public class PartitionTest new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe(); - ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build()); - ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + ReadCommand cmd1 = Util.cmd(cfs, "key1").build(); + ReadCommand cmd2 = Util.cmd(cfs, "key2").build(); + ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1); + ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2); MessageDigest digest1 = MessageDigest.getInstance("MD5"); MessageDigest digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version); assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); digest1 = MessageDigest.getInstance("MD5"); digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version); assertTrue(Arrays.equals(digest1.digest(), digest2.digest())); p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); @@ -160,8 +162,8 @@ public class PartitionTest p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); digest1 = MessageDigest.getInstance("MD5"); digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), digest2, version); assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); } finally http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/ReadResponseTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java b/test/unit/org/apache/cassandra/db/ReadResponseTest.java index af0ec60..52ab8bb 100644 --- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java +++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java @@ -69,12 +69,12 @@ public class ReadResponseTest extends CQLTester makePartition(cfs.metadata, "k3")); ReadResponse.LegacyRemoteDataResponse response = new ReadResponse.LegacyRemoteDataResponse(responses); - assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2"); - assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2"); - assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3"); + assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2"); + assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2"); + assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3"); - assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2"); - assertPartitions(response.makeIterator(cfs.metadata, Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3"); + assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2"); + assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3"); } private void assertPartitions(UnfilteredPartitionIterator actual, String... expectedKeys) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java index 7cacb5e..9af6028 100644 --- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java +++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java @@ -111,14 +111,14 @@ public class SinglePartitionSliceCommandTest logger.debug("ReadCommand: {}", cmd); UnfilteredPartitionIterator partitionIterator = cmd.executeLocally(ReadOrderGroup.emptyGroup()); - ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter()); + ReadResponse response = ReadResponse.createDataResponse(partitionIterator, cmd); logger.debug("creating response: {}", response); - partitionIterator = response.makeIterator(cfm, null); // <- cmd is null + partitionIterator = response.makeIterator(cmd); assert partitionIterator.hasNext(); UnfilteredRowIterator partition = partitionIterator.next(); - LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(partition); + LegacyLayout.LegacyUnfilteredPartition rowIter = LegacyLayout.fromUnfilteredRowIterator(cmd, partition); Assert.assertEquals(Collections.emptyList(), rowIter.cells); } @@ -168,14 +168,14 @@ public class SinglePartitionSliceCommandTest // check (de)serialized iterator for memtable static cell try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) { - response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + response = ReadResponse.createDataResponse(pi, cmd); } out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); in = new DataInputBuffer(out.buffer(), true); dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); - try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd)) + try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) { checkForS(pi); } @@ -184,13 +184,13 @@ public class SinglePartitionSliceCommandTest Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush(); try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup)) { - response = ReadResponse.createDataResponse(pi, cmd.columnFilter()); + response = ReadResponse.createDataResponse(pi, cmd); } out = new DataOutputBuffer((int) ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30)); ReadResponse.serializer.serialize(response, out, MessagingService.VERSION_30); in = new DataInputBuffer(out.buffer(), true); dst = ReadResponse.serializer.deserialize(in, MessagingService.VERSION_30); - try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd)) + try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd)) { checkForS(pi); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java index 5503cfb..c8f5cb1 100644 --- a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java +++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java @@ -59,9 +59,10 @@ public class DigestBackwardCompatibilityTest extends CQLTester * return ColumnFamily.digest(partition); */ - ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build()); + ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build(); + ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(cmd); MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, MessagingService.VERSION_22); + UnfilteredRowIterators.digest(cmd, partition.unfilteredIterator(), digest, MessagingService.VERSION_22); return ByteBuffer.wrap(digest.digest()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/service/DataResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java index ecffbbd..997f4e4 100644 --- a/test/unit/org/apache/cassandra/service/DataResolverTest.java +++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java @@ -712,7 +712,7 @@ public class DataResolverTest public MessageIn<ReadResponse> readResponseMessage(InetAddress from, UnfilteredPartitionIterator partitionIterator, ReadCommand cmd) { return MessageIn.create(from, - ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()), + ReadResponse.createRemoteDataResponse(partitionIterator, cmd), Collections.EMPTY_MAP, MessagingService.Verb.REQUEST_RESPONSE, MessagingService.current_version);