Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 eb1277007 -> 3e37b4a90
  refs/heads/cassandra-3.3 58c706177 -> 595600050
  refs/heads/trunk c6265a644 -> de405f2b6


Sends the proper amount of cells to old nodes on DISTINCT

patch by slebresne; reviewed by blerer for CASSANDRA-10762

On a DISTINCT query, 3.0 nodes were sending the 1 row back, but pre-3.0
nodes actually expect only the 1st cell and limits get thrown off if
they get more. This could actually be a problem for thrift queries
(on CQL tables only) when the limit ended up in the middle of a row.
The patch fixes this by enforcing the cell limit while serializing the
response to old nodes.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e37b4a9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e37b4a9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e37b4a9

Branch: refs/heads/cassandra-3.0
Commit: 3e37b4a90d4e5a036f24ac3d9a3aa804df6e6969
Parents: eb12770
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Jan 20 17:31:10 2016 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Wed Jan 27 15:45:27 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/LegacyLayout.java   |  53 ++++++++-
 .../org/apache/cassandra/db/ReadCommand.java    |  54 +++++----
 .../cassandra/db/ReadCommandVerbHandler.java    |   2 +-
 .../org/apache/cassandra/db/ReadResponse.java   | 110 ++++++++-----------
 .../apache/cassandra/db/filter/DataLimits.java  |  16 ++-
 .../db/partitions/PartitionUpdate.java          |   4 +-
 .../UnfilteredPartitionIterators.java           |   6 +-
 .../db/rows/UnfilteredRowIterators.java         |   6 +-
 .../org/apache/cassandra/repair/Validator.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   6 +-
 .../cassandra/service/DigestResolver.java       |   6 +-
 .../apache/cassandra/service/StorageProxy.java  |   2 +-
 .../cassandra/cache/CacheProviderTest.java      |   4 +-
 .../org/apache/cassandra/db/PartitionTest.java  |  18 +--
 .../apache/cassandra/db/ReadResponseTest.java   |  10 +-
 .../db/SinglePartitionSliceCommandTest.java     |  14 +--
 .../rows/DigestBackwardCompatibilityTest.java   |   5 +-
 .../cassandra/service/DataResolverTest.java     |   2 +-
 19 files changed, 191 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c99438f..8daeb2d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.3
+ * Fix DISTINCT queries in mixed version clusters (CASSANDRA-10762)
  * Migrate build status for indexes along with legacy schema (CASSANDRA-11046)
  * Ensure SSTables for legacy KEYS indexes can be read (CASSANDRA-11045)
  * Added support for IBM zSystems architecture (CASSANDRA-11054)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/LegacyLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java 
b/src/java/org/apache/cassandra/db/LegacyLayout.java
index 6121227..b90151e 100644
--- a/src/java/org/apache/cassandra/db/LegacyLayout.java
+++ b/src/java/org/apache/cassandra/db/LegacyLayout.java
@@ -32,6 +32,7 @@ import com.google.common.collect.PeekingIterator;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.context.CounterContext;
@@ -318,8 +319,46 @@ public abstract class LegacyLayout
         return CompositeType.build(values);
     }
 
+    /**
+     * The maximum number of cells to include per partition when converting to 
the old format.
+     * <p>
+     * We already apply the limit during the actual query, but for queries 
that counts cells and not rows (thrift queries
+     * and distinct queries as far as old nodes are concerned), we may still 
include a little bit more than requested
+     * because {@link DataLimits} always include full rows. So if the limit 
ends in the middle of a queried row, the
+     * full row will be part of our result. This would confuse old nodes 
however so we make sure to truncate it to
+     * what's expected before writting it on the wire.
+     *
+     * @param command the read commmand for which to determine the maximum 
cells per partition. This can be {@code null}
+     * in which case {@code Integer.MAX_VALUE} is returned.
+     * @return the maximum number of cells per partition that should be 
enforced according to the read command if
+     * post-query limitation are in order (see above). This will be {@code 
Integer.MAX_VALUE} if no such limits are
+     * necessary.
+     */
+    private static int maxCellsPerPartition(ReadCommand command)
+    {
+        if (command == null)
+            return Integer.MAX_VALUE;
+
+        DataLimits limits = command.limits();
+
+        // There is 2 types of DISTINCT queries: those that includes only the 
partition key, and those that include static columns.
+        // On old nodes, the latter expects the first row in term of CQL 
count, which is what we already have and there is no additional
+        // limit to apply. The former however expect only one cell per 
partition and rely on it (See CASSANDRA-10762).
+        if (limits.isDistinct())
+            return command.columnFilter().fetchedColumns().statics.isEmpty() ? 
1 : Integer.MAX_VALUE;
+
+        switch (limits.kind())
+        {
+            case THRIFT_LIMIT:
+            case SUPER_COLUMN_COUNTING_LIMIT:
+                return limits.perPartitionCount();
+            default:
+                return Integer.MAX_VALUE;
+        }
+    }
+
     // For serializing to old wire format
-    public static LegacyUnfilteredPartition 
fromUnfilteredRowIterator(UnfilteredRowIterator iterator)
+    public static LegacyUnfilteredPartition 
fromUnfilteredRowIterator(ReadCommand command, UnfilteredRowIterator iterator)
     {
         // we need to extract the range tombstone so materialize the 
partition. Since this is
         // used for the on-wire format, this is not worst than it used to be.
@@ -333,6 +372,10 @@ public abstract class LegacyLayout
         // before we use the LegacyRangeTombstoneList at all
         List<LegacyLayout.LegacyCell> cells = Lists.newArrayList(pair.right);
 
+        int maxCellsPerPartition = maxCellsPerPartition(command);
+        if (cells.size() > maxCellsPerPartition)
+            cells = cells.subList(0, maxCellsPerPartition);
+
         // The LegacyRangeTombstoneList already has range tombstones for the 
single-row deletions and complex
         // deletions.  Go through our normal range tombstones and add then to 
the LegacyRTL so that the range
         // tombstones all get merged and sorted properly.
@@ -352,13 +395,13 @@ public abstract class LegacyLayout
         return new LegacyUnfilteredPartition(info.getPartitionDeletion(), rtl, 
cells);
     }
 
-    public static void serializeAsLegacyPartition(UnfilteredRowIterator 
partition, DataOutputPlus out, int version) throws IOException
+    public static void serializeAsLegacyPartition(ReadCommand command, 
UnfilteredRowIterator partition, DataOutputPlus out, int version) throws 
IOException
     {
         assert version < MessagingService.VERSION_30;
 
         out.writeBoolean(true);
 
-        LegacyLayout.LegacyUnfilteredPartition legacyPartition = 
LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = 
LegacyLayout.fromUnfilteredRowIterator(command, partition);
 
         UUIDSerializer.serializer.serialize(partition.metadata().cfId, out, 
version);
         DeletionTime.serializer.serialize(legacyPartition.partitionDeletion, 
out);
@@ -420,7 +463,7 @@ public abstract class LegacyLayout
     }
 
     // For the old wire format
-    public static long serializedSizeAsLegacyPartition(UnfilteredRowIterator 
partition, int version)
+    public static long serializedSizeAsLegacyPartition(ReadCommand command, 
UnfilteredRowIterator partition, int version)
     {
         assert version < MessagingService.VERSION_30;
 
@@ -429,7 +472,7 @@ public abstract class LegacyLayout
 
         long size = TypeSizes.sizeof(true);
 
-        LegacyLayout.LegacyUnfilteredPartition legacyPartition = 
LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition legacyPartition = 
LegacyLayout.fromUnfilteredRowIterator(command, partition);
 
         size += 
UUIDSerializer.serializer.serializedSize(partition.metadata().cfId, version);
         size += 
DeletionTime.serializer.serializedSize(legacyPartition.partitionDeletion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 668a189..f21d100 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -282,11 +282,11 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
-    public ReadResponse createResponse(UnfilteredPartitionIterator iterator, 
ColumnFilter selection)
+    public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
         return isDigestQuery()
-             ? ReadResponse.createDigestResponse(iterator, digestVersion)
-             : ReadResponse.createDataResponse(iterator, selection);
+             ? ReadResponse.createDigestResponse(iterator, this)
+             : ReadResponse.createDataResponse(iterator, this);
     }
 
     public long indexSerializedSize(int version)
@@ -723,18 +723,17 @@ public abstract class ReadCommand implements ReadQuery
                 out.writeBoolean(filter.isReversed());
 
                 // limit
-                DataLimits.Kind kind = rangeCommand.limits().kind();
-                boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || 
kind == DataLimits.Kind.CQL_PAGING_LIMIT) && 
rangeCommand.limits().perPartitionCount() == 1;
-                if (isDistinct)
+                DataLimits limits = rangeCommand.limits();
+                if (limits.isDistinct())
                     out.writeInt(1);
                 else
                     
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(),
 filter.requestedSlices()));
 
                 int compositesToGroup;
                 boolean selectsStatics = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || 
filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-                if (kind == DataLimits.Kind.THRIFT_LIMIT)
+                if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
                     compositesToGroup = -1;
-                else if (isDistinct && !selectsStatics)
+                else if (limits.isDistinct() && !selectsStatics)
                     compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
                 else
                     compositesToGroup = metadata.isDense() ? -1 : 
metadata.clusteringColumns().size();
@@ -799,11 +798,15 @@ public abstract class ReadCommand implements ReadQuery
             AbstractBounds<PartitionPosition> keyRange = 
AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, 
version);
             int maxResults = in.readInt();
 
-            in.readBoolean();  // countCQL3Rows (not needed)
+            boolean countCQL3Rows = in.readBoolean();  // countCQL3Rows (not 
needed)
             in.readBoolean();  // isPaging (not needed)
 
             boolean selectsStatics = 
(!selection.fetchedColumns().statics.isEmpty() || 
filter.selects(Clustering.STATIC_CLUSTERING));
-            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit 
== 1 && selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter one is slightly less
+            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries 
are the only CQL queries that have countCQL3Rows to false so we use
+            // that fact.
+            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup 
!= -1 && !countCQL3Rows);
             DataLimits limits;
             if (isDistinct)
                 limits = DataLimits.distinctLimits(maxResults);
@@ -1054,9 +1057,13 @@ public abstract class ReadCommand implements ReadQuery
 
             RowFilter rowFilter = 
LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
             int maxResults = in.readInt();
-            in.readBoolean(); // countCQL3Rows
+            boolean countCQL3Rows = in.readBoolean();
 
-            boolean isDistinct = compositesToGroup == -2 || (perPartitionLimit 
== 1 && selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter one is slightly less
+            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries 
are the only CQL queries that have countCQL3Rows to false so we use
+            // that fact.
+            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup 
!= -1 && !countCQL3Rows);
             DataLimits limits;
             if (isDistinct)
                 limits = DataLimits.distinctLimits(maxResults);
@@ -1340,17 +1347,16 @@ public abstract class ReadCommand implements ReadQuery
             out.writeBoolean(filter.isReversed());
 
             boolean selectsStatics = 
!command.columnFilter().fetchedColumns().statics.isEmpty() || 
slices.selects(Clustering.STATIC_CLUSTERING);
-            DataLimits.Kind kind = command.limits().kind();
-            boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == 
DataLimits.Kind.CQL_PAGING_LIMIT) && command.limits().perPartitionCount() == 1;
-            if (isDistinct)
+            DataLimits limits = command.limits();
+            if (limits.isDistinct())
                 out.writeInt(1);  // the limit is always 1 for DISTINCT queries
             else
                 out.writeInt(updateLimitForQuery(command.limits().count(), 
filter.requestedSlices()));
 
             int compositesToGroup;
-            if (kind == DataLimits.Kind.THRIFT_LIMIT || metadata.isDense())
+            if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || 
metadata.isDense())
                 compositesToGroup = -1;
-            else if (isDistinct && !selectsStatics)
+            else if (limits.isDistinct() && !selectsStatics)
                 compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
             else
                 compositesToGroup = metadata.clusteringColumns().size();
@@ -1369,9 +1375,19 @@ public abstract class ReadCommand implements ReadQuery
             // if a slice query from a pre-3.0 node doesn't cover statics, we 
shouldn't select them at all
             ColumnFilter columnFilter = 
LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, 
compositesToGroup, metadata);
 
-            boolean isDistinct = compositesToGroup == -2 || (count == 1 && 
selectsStatics);
+            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
+            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter is probablematic
+            // however as we have no way to distinguish it from a normal 
select with a limit of 1 (and this, contrarily to the range query case
+            // were the countCQL3Rows boolean allows us to decide).
+            // So we consider this case not distinct here. This is ok because 
even if it is a distinct (with static), the count will be 1 and
+            // we'll still just query one row (a distinct DataLimits currently 
behave exactly like a CQL limit with a count of 1). The only
+            // drawback is that we'll send back the first row entirely while a 
2.1/2.2 node would return only the first cell in that same
+            // situation. This isn't a problem for 2.1/2.2 code however (it 
would be for a range query, as it would throw off the count for
+            // reasons similar to CASSANDRA-10762, but it's ok for single 
partition queries).
+            // We do _not_ want to do the reverse however and consider a 
'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make
+            // us only return the 1st cell rather then 1st row.
             DataLimits limits;
-            if (compositesToGroup == -2 || isDistinct)
+            if (compositesToGroup == -2)
                 limits = DataLimits.distinctLimits(count);  // See 
CASSANDRA-8490 for the explanation of this value
             else if (compositesToGroup == -1)
                 limits = DataLimits.thriftLimits(1, count);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java 
b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 72a6fa8..9cde8dc 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -44,7 +44,7 @@ public class ReadCommandVerbHandler implements 
IVerbHandler<ReadCommand>
         ReadResponse response;
         try (ReadOrderGroup opGroup = command.startOrderGroup(); 
UnfilteredPartitionIterator iterator = command.executeLocally(opGroup))
         {
-            response = command.createResponse(iterator, 
command.columnFilter());
+            response = command.createResponse(iterator);
         }
 
         MessageOut<ReadResponse> reply = new 
MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index 41f0d5d..a618aa5 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -53,38 +53,38 @@ public abstract class ReadResponse
 
     // This is used only when serializing data responses and we can't it 
easily in other cases. So this can be null, which is slighly
     // hacky, but as this hack doesn't escape this class, and it's easy enough 
to validate that it's not null when we need, it's "good enough".
-    private final CFMetaData metadata;
+    private final ReadCommand command;
 
-    protected ReadResponse(CFMetaData metadata)
+    protected ReadResponse(ReadCommand command)
     {
-        this.metadata = metadata;
+        this.command = command;
     }
 
-    public static ReadResponse createDataResponse(UnfilteredPartitionIterator 
data, ColumnFilter selection)
+    public static ReadResponse createDataResponse(UnfilteredPartitionIterator 
data, ReadCommand command)
     {
-        return new LocalDataResponse(data, selection);
+        return new LocalDataResponse(data, command);
     }
 
     @VisibleForTesting
-    public static ReadResponse 
createRemoteDataResponse(UnfilteredPartitionIterator data, ColumnFilter 
selection)
+    public static ReadResponse 
createRemoteDataResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
-        return new RemoteDataResponse(LocalDataResponse.build(data, 
selection));
+        return new RemoteDataResponse(LocalDataResponse.build(data, 
command.columnFilter()));
     }
 
-    public static ReadResponse 
createDigestResponse(UnfilteredPartitionIterator data, int version)
+    public static ReadResponse 
createDigestResponse(UnfilteredPartitionIterator data, ReadCommand command)
     {
-        return new DigestResponse(makeDigest(data, version));
+        return new DigestResponse(makeDigest(data, command));
     }
 
-    public abstract UnfilteredPartitionIterator makeIterator(CFMetaData 
metadata, ReadCommand command);
-    public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand 
command);
+    public abstract UnfilteredPartitionIterator makeIterator(ReadCommand 
command);
+    public abstract ByteBuffer digest(ReadCommand command);
 
     public abstract boolean isDigestResponse();
 
-    protected static ByteBuffer makeDigest(UnfilteredPartitionIterator 
iterator, int version)
+    protected static ByteBuffer makeDigest(UnfilteredPartitionIterator 
iterator, ReadCommand command)
     {
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        UnfilteredPartitionIterators.digest(iterator, digest, version);
+        UnfilteredPartitionIterators.digest(command, iterator, digest, 
command.digestVersion());
         return ByteBuffer.wrap(digest.digest());
     }
 
@@ -99,12 +99,12 @@ public abstract class ReadResponse
             this.digest = digest;
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, 
ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(ReadCommand command)
         {
             throw new UnsupportedOperationException();
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
             // We assume that the digest is in the proper version, which bug 
excluded should be true since this is called with
             // ReadCommand.digestVersion() as argument and that's also what we 
use to produce the digest in the first place.
@@ -122,11 +122,9 @@ public abstract class ReadResponse
     // built on the owning node responding to a query
     private static class LocalDataResponse extends DataResponse
     {
-        private final ColumnFilter received;
-        private LocalDataResponse(UnfilteredPartitionIterator iter, 
ColumnFilter received)
+        private LocalDataResponse(UnfilteredPartitionIterator iter, 
ReadCommand command)
         {
-            super(iter.metadata(), build(iter, received), 
SerializationHelper.Flag.LOCAL);
-            this.received = received;
+            super(command, build(iter, command.columnFilter()), 
SerializationHelper.Flag.LOCAL);
         }
 
         private static ByteBuffer build(UnfilteredPartitionIterator iter, 
ColumnFilter selection)
@@ -142,14 +140,6 @@ public abstract class ReadResponse
                 throw new RuntimeException(e);
             }
         }
-
-        protected ColumnFilter selection(ReadCommand sent)
-        {
-            // we didn't send anything, so we don't provide it in the 
serializer methods, but use the
-            // object's reference to the original column filter we received
-            assert sent == null || sent.columnFilter() == received;
-            return received;
-        }
     }
 
     // built on the coordinator node receiving a response
@@ -159,13 +149,6 @@ public abstract class ReadResponse
         {
             super(null, data, SerializationHelper.Flag.FROM_REMOTE);
         }
-
-        protected ColumnFilter selection(ReadCommand sent)
-        {
-            // we should always know what we sent, and should provide it in 
digest() and makeIterator()
-            assert sent != null;
-            return sent.columnFilter();
-        }
     }
 
     static abstract class DataResponse extends ReadResponse
@@ -175,23 +158,24 @@ public abstract class ReadResponse
         private final ByteBuffer data;
         private final SerializationHelper.Flag flag;
 
-        protected DataResponse(CFMetaData metadata, ByteBuffer data, 
SerializationHelper.Flag flag)
+        protected DataResponse(ReadCommand command, ByteBuffer data, 
SerializationHelper.Flag flag)
         {
-            super(metadata);
+            super(command);
             this.data = data;
             this.flag = flag;
         }
 
-        protected abstract ColumnFilter selection(ReadCommand command);
-
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, 
ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(ReadCommand command)
         {
             try (DataInputBuffer in = new DataInputBuffer(data, true))
             {
+                // Note that the command parameter shadows the 'command' field 
and this is intended because
+                // the later can be null (for RemoteDataResponse as those are 
created in the serializers and
+                // those don't have easy access to the command). This is also 
why we need the command as parameter here.
                 return 
UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
                                                                                
          MessagingService.current_version,
-                                                                               
          metadata,
-                                                                               
          selection(command),
+                                                                               
          command.metadata(),
+                                                                               
          command.columnFilter(),
                                                                                
          flag);
             }
             catch (IOException e)
@@ -201,11 +185,11 @@ public abstract class ReadResponse
             }
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, 
command))
+            try (UnfilteredPartitionIterator iterator = makeIterator(command))
             {
-                return makeDigest(iterator, command.digestVersion());
+                return makeDigest(iterator, command);
             }
         }
 
@@ -229,11 +213,11 @@ public abstract class ReadResponse
         @VisibleForTesting
         LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
         {
-            super(null); // we never serialize LegacyRemoteDataResponses, so 
we don't care about the metadata
+            super(null); // we never serialize LegacyRemoteDataResponses, so 
we don't care about the command
             this.partitions = partitions;
         }
 
-        public UnfilteredPartitionIterator makeIterator(CFMetaData metadata, 
final ReadCommand command)
+        public UnfilteredPartitionIterator makeIterator(final ReadCommand 
command)
         {
             // Due to a bug in the serialization of AbstractBounds, anything 
that isn't a Range is understood by pre-3.0 nodes
             // as a Bound, which means IncludingExcludingBounds and 
ExcludingBounds responses may include keys they shouldn't.
@@ -271,7 +255,7 @@ public abstract class ReadResponse
 
                 public CFMetaData metadata()
                 {
-                    return metadata;
+                    return command.metadata();
                 }
 
                 public boolean hasNext()
@@ -296,11 +280,11 @@ public abstract class ReadResponse
             };
         }
 
-        public ByteBuffer digest(CFMetaData metadata, ReadCommand command)
+        public ByteBuffer digest(ReadCommand command)
         {
-            try (UnfilteredPartitionIterator iterator = makeIterator(metadata, 
command))
+            try (UnfilteredPartitionIterator iterator = makeIterator(command))
             {
-                return makeDigest(iterator, command.digestVersion());
+                return makeDigest(iterator, command);
             }
         }
 
@@ -323,14 +307,14 @@ public abstract class ReadResponse
                 out.writeBoolean(isDigest);
                 if (!isDigest)
                 {
-                    assert !(response instanceof LegacyRemoteDataResponse); // 
we only use those on the receiving side
-                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.metadata, null))
+                    assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
+                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.command))
                     {
                         assert iter.hasNext();
                         try (UnfilteredRowIterator partition = iter.next())
                         {
                             
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                            LegacyLayout.serializeAsLegacyPartition(partition, 
out, version);
+                            
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, 
version);
                         }
                         assert !iter.hasNext();
                     }
@@ -397,14 +381,14 @@ public abstract class ReadResponse
                         + TypeSizes.sizeof(isDigest);
                 if (!isDigest)
                 {
-                    assert !(response instanceof LegacyRemoteDataResponse); // 
we only use those on the receiving side
-                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.metadata, null))
+                    assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
+                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.command))
                     {
                         assert iter.hasNext();
                         try (UnfilteredRowIterator partition = iter.next())
                         {
                             size += 
ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                            size += 
LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                            size += 
LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, 
version);
                         }
                         assert !iter.hasNext();
                     }
@@ -458,8 +442,8 @@ public abstract class ReadResponse
 
             // determine the number of partitions upfront for serialization
             int numPartitions = 0;
-            assert !(response instanceof LegacyRemoteDataResponse); // we only 
use those on the receiving side
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.metadata, null))
+            assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
+            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
@@ -476,14 +460,14 @@ public abstract class ReadResponse
 
             out.writeInt(numPartitions);
 
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.metadata, null))
+            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
                     try (UnfilteredRowIterator partition = iterator.next())
                     {
                         
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                        LegacyLayout.serializeAsLegacyPartition(partition, 
out, version);
+                        
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, 
version);
                     }
                 }
             }
@@ -509,15 +493,15 @@ public abstract class ReadResponse
             assert version < MessagingService.VERSION_30;
             long size = TypeSizes.sizeof(0);  // number of partitions
 
-            assert !(response instanceof LegacyRemoteDataResponse); // we only 
use those on the receiving side
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.metadata, null))
+            assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
+            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
             {
                 while (iterator.hasNext())
                 {
                     try (UnfilteredRowIterator partition = iterator.next())
                     {
                         size += 
ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                        size += 
LegacyLayout.serializedSizeAsLegacyPartition(partition, version);
+                        size += 
LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, 
version);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java 
b/src/java/org/apache/cassandra/db/filter/DataLimits.java
index 19f24ad..f6fdcdd 100644
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@ -98,6 +98,7 @@ public abstract class DataLimits
     public abstract Kind kind();
 
     public abstract boolean isUnlimited();
+    public abstract boolean isDistinct();
 
     public abstract DataLimits forPaging(int pageSize);
     public abstract DataLimits forPaging(int pageSize, ByteBuffer 
lastReturnedKey, int lastReturnedKeyRemaining);
@@ -232,8 +233,7 @@ public abstract class DataLimits
         protected final int rowLimit;
         protected final int perPartitionLimit;
 
-        // Whether the query is a distinct query or not. This is currently not 
used by the code but prior experience
-        // shows that keeping the information around is wise and might be 
useful in the future.
+        // Whether the query is a distinct query or not.
         protected final boolean isDistinct;
 
         private CQLLimits(int rowLimit)
@@ -268,9 +268,14 @@ public abstract class DataLimits
             return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT;
         }
 
+        public boolean isDistinct()
+        {
+            return isDistinct;
+        }
+
         public DataLimits forPaging(int pageSize)
         {
-            return new CQLLimits(pageSize, perPartitionLimit);
+            return new CQLLimits(pageSize, perPartitionLimit, isDistinct);
         }
 
         public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
@@ -513,6 +518,11 @@ public abstract class DataLimits
             return partitionLimit == NO_LIMIT && cellPerPartitionLimit == 
NO_LIMIT;
         }
 
+        public boolean isDistinct()
+        {
+            return false;
+        }
+
         public DataLimits forPaging(int pageSize)
         {
             // We don't support paging on thrift in general but do use paging 
under the hood for get_count. For

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index f10b3b6..6331440 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -613,7 +613,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
                 if (version < MessagingService.VERSION_30)
                 {
-                    LegacyLayout.serializeAsLegacyPartition(iter, out, 
version);
+                    LegacyLayout.serializeAsLegacyPartition(null, iter, out, 
version);
                 }
                 else
                 {
@@ -699,7 +699,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             try (UnfilteredRowIterator iter = 
update.sliceableUnfilteredIterator())
             {
                 if (version < MessagingService.VERSION_30)
-                    return LegacyLayout.serializedSizeAsLegacyPartition(iter, 
version);
+                    return LegacyLayout.serializedSizeAsLegacyPartition(null, 
iter, version);
 
                 return CFMetaData.serializer.serializedSize(update.metadata(), 
version)
                      + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, 
update.rowCount());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index a3f7981..41b1424 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -238,11 +238,13 @@ public abstract class UnfilteredPartitionIterators
     /**
      * Digests the the provided iterator.
      *
+     * @param command the command that has yield {@code iterator}. This can be 
null if {@code version >= MessagingService.VERSION_30}
+     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredPartitionIterator iterator, 
MessageDigest digest, int version)
+    public static void digest(ReadCommand command, UnfilteredPartitionIterator 
iterator, MessageDigest digest, int version)
     {
         try (UnfilteredPartitionIterator iter = iterator)
         {
@@ -250,7 +252,7 @@ public abstract class UnfilteredPartitionIterators
             {
                 try (UnfilteredRowIterator partition = iter.next())
                 {
-                    UnfilteredRowIterators.digest(partition, digest, version);
+                    UnfilteredRowIterators.digest(command, partition, digest, 
version);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index ea929d7..9416896 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -102,15 +102,17 @@ public abstract class UnfilteredRowIterators
     /**
      * Digests the partition represented by the provided iterator.
      *
+     * @param command the command that has yield {@code iterator}. This can be 
null if {@code version >= MessagingService.VERSION_30}
+     * as this is only used when producing digest to be sent to legacy nodes.
      * @param iterator the iterator to digest.
      * @param digest the {@code MessageDigest} to use for the digest.
      * @param version the messaging protocol to use when producing the digest.
      */
-    public static void digest(UnfilteredRowIterator iterator, MessageDigest 
digest, int version)
+    public static void digest(ReadCommand command, UnfilteredRowIterator 
iterator, MessageDigest digest, int version)
     {
         if (version < MessagingService.VERSION_30)
         {
-            
LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), 
digest);
+            LegacyLayout.fromUnfilteredRowIterator(command, 
iterator).digest(iterator.metadata(), digest);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java 
b/src/java/org/apache/cassandra/repair/Validator.java
index 3db9761..217c9de 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -211,7 +211,7 @@ public class Validator implements Runnable
         validated++;
         // MerkleTree uses XOR internally, so we want lots of output bits here
         CountingDigest digest = new 
CountingDigest(FBUtilities.newMessageDigest("SHA-256"));
-        UnfilteredRowIterators.digest(partition, digest, 
MessagingService.current_version);
+        UnfilteredRowIterators.digest(null, partition, digest, 
MessagingService.current_version);
         // only return new hash for merkle tree in case digest was updated - 
see CASSANDRA-8979
         return digest.count > 0
              ? new MerkleTree.RowHash(partition.partitionKey().getToken(), 
digest.digest(), digest.count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index f3858d7..1fe931f 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -50,7 +50,7 @@ public class DataResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         ReadResponse response = responses.iterator().next().payload;
-        return 
UnfilteredPartitionIterators.filter(response.makeIterator(command.metadata(), 
command), command.nowInSec());
+        return 
UnfilteredPartitionIterators.filter(response.makeIterator(command), 
command.nowInSec());
     }
 
     public PartitionIterator resolve()
@@ -63,7 +63,7 @@ public class DataResolver extends ResponseResolver
         for (int i = 0; i < count; i++)
         {
             MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command.metadata(), command));
+            iters.add(msg.payload.makeIterator(command));
             sources[i] = msg.from;
         }
 
@@ -385,7 +385,7 @@ public class DataResolver extends ResponseResolver
                 // We don't call handler.get() because we want to preserve 
tombstones since we're still in the middle of merging node results.
                 handler.awaitResults();
                 assert resolver.responses.size() == 1;
-                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command.metadata(),
 command), retryCommand);
+                return 
UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command),
 retryCommand);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java 
b/src/java/org/apache/cassandra/service/DigestResolver.java
index 62b4538..4a918a3 100644
--- a/src/java/org/apache/cassandra/service/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/DigestResolver.java
@@ -48,7 +48,7 @@ public class DigestResolver extends ResponseResolver
     public PartitionIterator getData()
     {
         assert isDataPresent();
-        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(),
 command), command.nowInSec());
+        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), 
command.nowInSec());
     }
 
     /*
@@ -77,7 +77,7 @@ public class DigestResolver extends ResponseResolver
         {
             ReadResponse response = message.payload;
 
-            ByteBuffer newDigest = response.digest(command.metadata(), 
command);
+            ByteBuffer newDigest = response.digest(command);
             if (digest == null)
                 digest = newDigest;
             else if (!digest.equals(newDigest))
@@ -88,7 +88,7 @@ public class DigestResolver extends ResponseResolver
         if (logger.isTraceEnabled())
             logger.trace("resolve: {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
 
-        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command.metadata(),
 command), command.nowInSec());
+        return 
UnfilteredPartitionIterators.filter(dataResponse.makeIterator(command), 
command.nowInSec());
     }
 
     public boolean isDataPresent()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 89ac0bb..8fa2082 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -1779,7 +1779,7 @@ public class StorageProxy implements StorageProxyMBean
             {
                 try (ReadOrderGroup orderGroup = command.startOrderGroup(); 
UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup))
                 {
-                    handler.response(command.createResponse(iterator, 
command.columnFilter()));
+                    handler.response(command.createResponse(iterator));
                 }
                 
MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java 
b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index cd52d35..a4173d6 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -103,8 +103,8 @@ public class CacheProviderTest
         {
             MessageDigest d1 = MessageDigest.getInstance("MD5");
             MessageDigest d2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(((CachedBTreePartition) 
one).unfilteredIterator(), d1, MessagingService.current_version);
-            UnfilteredRowIterators.digest(((CachedBTreePartition) 
two).unfilteredIterator(), d2, MessagingService.current_version);
+            UnfilteredRowIterators.digest(null, ((CachedBTreePartition) 
one).unfilteredIterator(), d1, MessagingService.current_version);
+            UnfilteredRowIterators.digest(null, ((CachedBTreePartition) 
two).unfilteredIterator(), d2, MessagingService.current_version);
             assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest()));
         }
         catch (NoSuchAlgorithmException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/PartitionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java 
b/test/unit/org/apache/cassandra/db/PartitionTest.java
index 623ff0e..7216ab7 100644
--- a/test/unit/org/apache/cassandra/db/PartitionTest.java
+++ b/test/unit/org/apache/cassandra/db/PartitionTest.java
@@ -138,21 +138,23 @@ public class PartitionTest
 
             new RowUpdateBuilder(cfs.metadata, 5, 
"key2").clustering("c").add("val", "val2").build().applyUnsafe();
 
-            ImmutableBTreePartition p1 = 
Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build());
-            ImmutableBTreePartition p2 = 
Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build());
+            ReadCommand cmd1 = Util.cmd(cfs, "key1").build();
+            ReadCommand cmd2 = Util.cmd(cfs, "key2").build();
+            ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(cmd1);
+            ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(cmd2);
 
             MessageDigest digest1 = MessageDigest.getInstance("MD5");
             MessageDigest digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, 
version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, 
version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), 
digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), 
digest2, version);
             assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
"key2").build());
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
"key2").build());
             digest1 = MessageDigest.getInstance("MD5");
             digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, 
version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, 
version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), 
digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), 
digest2, version);
             assertTrue(Arrays.equals(digest1.digest(), digest2.digest()));
 
             p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
"key2").build());
@@ -160,8 +162,8 @@ public class PartitionTest
             p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, 
"key2").build());
             digest1 = MessageDigest.getInstance("MD5");
             digest2 = MessageDigest.getInstance("MD5");
-            UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, 
version);
-            UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, 
version);
+            UnfilteredRowIterators.digest(cmd1, p1.unfilteredIterator(), 
digest1, version);
+            UnfilteredRowIterators.digest(cmd2, p2.unfilteredIterator(), 
digest2, version);
             assertFalse(Arrays.equals(digest1.digest(), digest2.digest()));
         }
         finally

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/ReadResponseTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ReadResponseTest.java 
b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
index af0ec60..52ab8bb 100644
--- a/test/unit/org/apache/cassandra/db/ReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadResponseTest.java
@@ -69,12 +69,12 @@ public class ReadResponseTest extends CQLTester
                                                                 
makePartition(cfs.metadata, "k3"));
         ReadResponse.LegacyRemoteDataResponse response = new 
ReadResponse.LegacyRemoteDataResponse(responses);
 
-        assertPartitions(response.makeIterator(cfs.metadata, 
Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()), "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, 
Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()), "k1", "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, 
Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()), "k2", "k3");
+        
assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k3").build()),
 "k2");
+        
assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k0").toKeyExcl("k3").build()),
 "k1", "k2");
+        
assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyExcl("k1").toKeyExcl("k4").build()),
 "k2", "k3");
 
-        assertPartitions(response.makeIterator(cfs.metadata, 
Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()), "k1", "k2");
-        assertPartitions(response.makeIterator(cfs.metadata, 
Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()), "k1", "k2", "k3");
+        
assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k3").build()),
 "k1", "k2");
+        
assertPartitions(response.makeIterator(Util.cmd(cfs).fromKeyIncl("k1").toKeyExcl("k4").build()),
 "k1", "k2", "k3");
     }
 
     private void assertPartitions(UnfilteredPartitionIterator actual, 
String... expectedKeys)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java 
b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 7cacb5e..9af6028 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -111,14 +111,14 @@ public class SinglePartitionSliceCommandTest
 
         logger.debug("ReadCommand: {}", cmd);
         UnfilteredPartitionIterator partitionIterator = 
cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        ReadResponse response = 
ReadResponse.createDataResponse(partitionIterator, cmd.columnFilter());
+        ReadResponse response = 
ReadResponse.createDataResponse(partitionIterator, cmd);
 
         logger.debug("creating response: {}", response);
-        partitionIterator = response.makeIterator(cfm, null);  // <- cmd is 
null
+        partitionIterator = response.makeIterator(cmd);
         assert partitionIterator.hasNext();
         UnfilteredRowIterator partition = partitionIterator.next();
 
-        LegacyLayout.LegacyUnfilteredPartition rowIter = 
LegacyLayout.fromUnfilteredRowIterator(partition);
+        LegacyLayout.LegacyUnfilteredPartition rowIter = 
LegacyLayout.fromUnfilteredRowIterator(cmd, partition);
         Assert.assertEquals(Collections.emptyList(), rowIter.cells);
     }
 
@@ -168,14 +168,14 @@ public class SinglePartitionSliceCommandTest
         // check (de)serialized iterator for memtable static cell
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
         {
-            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+            response = ReadResponse.createDataResponse(pi, cmd);
         }
 
         out = new DataOutputBuffer((int) 
ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, 
MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, 
MessagingService.VERSION_30);
-        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
         {
             checkForS(pi);
         }
@@ -184,13 +184,13 @@ public class SinglePartitionSliceCommandTest
         
Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
         try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
         {
-            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+            response = ReadResponse.createDataResponse(pi, cmd);
         }
         out = new DataOutputBuffer((int) 
ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, 
MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, 
MessagingService.VERSION_30);
-        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cmd))
         {
             checkForS(pi);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java 
b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
index 5503cfb..c8f5cb1 100644
--- 
a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
+++ 
b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java
@@ -59,9 +59,10 @@ public class DigestBackwardCompatibilityTest extends 
CQLTester
          *   return ColumnFamily.digest(partition);
          */
 
-        ImmutableBTreePartition partition = 
Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), 
partitionKey).build());
+        ReadCommand cmd = Util.cmd(getCurrentColumnFamilyStore(), 
partitionKey).build();
+        ImmutableBTreePartition partition = 
Util.getOnlyPartitionUnfiltered(cmd);
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, 
MessagingService.VERSION_22);
+        UnfilteredRowIterators.digest(cmd, partition.unfilteredIterator(), 
digest, MessagingService.VERSION_22);
         return ByteBuffer.wrap(digest.digest());
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e37b4a9/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java 
b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index ecffbbd..997f4e4 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -712,7 +712,7 @@ public class DataResolverTest
     public MessageIn<ReadResponse> readResponseMessage(InetAddress from, 
UnfilteredPartitionIterator partitionIterator, ReadCommand cmd)
     {
         return MessageIn.create(from,
-                                
ReadResponse.createRemoteDataResponse(partitionIterator, cmd.columnFilter()),
+                                
ReadResponse.createRemoteDataResponse(partitionIterator, cmd),
                                 Collections.EMPTY_MAP,
                                 MessagingService.Verb.REQUEST_RESPONSE,
                                 MessagingService.current_version);

Reply via email to