Merge branch cassandra-3.0 into cassandra-3.11

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aa89a64
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aa89a64
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aa89a64

Branch: refs/heads/trunk
Commit: 7aa89a64e09c57061418c1d83c03ae7cfd0cd745
Parents: bd89f56 e2445cf
Author: Benjamin Lerer <b.le...@gmail.com>
Authored: Fri Jul 14 17:18:19 2017 +0200
Committer: Benjamin Lerer <b.le...@gmail.com>
Committed: Fri Jul 14 17:19:38 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   6 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  |   3 +-
 src/java/org/apache/cassandra/db/DataRange.java |   5 +
 .../cassandra/db/PartitionRangeReadCommand.java |   6 +
 .../org/apache/cassandra/db/ReadCommand.java    |   2 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |  12 +
 .../db/SinglePartitionReadCommand.java          |  22 +-
 .../apache/cassandra/db/filter/DataLimits.java  |  82 +++---
 .../apache/cassandra/db/filter/RowFilter.java   |  15 +
 .../apache/cassandra/service/CacheService.java  |   2 +-
 .../apache/cassandra/service/DataResolver.java  |   4 +-
 .../apache/cassandra/service/StorageProxy.java  |   8 +-
 .../service/pager/AbstractQueryPager.java       |   2 +-
 .../service/pager/MultiPartitionPager.java      |   9 +-
 .../cassandra/service/pager/QueryPagers.java    |   2 +-
 .../org/apache/cassandra/cql3/CQLTester.java    |   8 +-
 .../validation/operations/SelectLimitTest.java  | 279 +++++++++++++++++++
 .../db/rows/UnfilteredRowIteratorsTest.java     |  10 +-
 18 files changed, 418 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e7ad6fb,c916452..edd66e2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -6,53 -2,16 +6,53 @@@ Merged from 3.0
   * Make concat work with iterators that have different subsets of columns 
(CASSANDRA-13482)
   * Set test.runners based on cores and memory size (CASSANDRA-13078)
   * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557)
 - * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)
   * Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
   * Nodetool listsnapshots output is missing a newline, if there are no 
snapshots (CASSANDRA-13568)
 - Merged from 2.2:
 -  * Fix queries with LIMIT and filtering on clustering columns 
(CASSANDRA-11223)
 -  * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
 -  * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592)
 -  * Fix nested Tuples/UDTs validation (CASSANDRA-13646)
 +Merged from 2.2:
-   * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
-   * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592)
-   * Fix nested Tuples/UDTs validation (CASSANDRA-13646)
++ * Fix queries with LIMIT and filtering on clustering columns 
(CASSANDRA-11223)
++ * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272)
++ * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) 
 * Fix nested Tuples/UDTs validation (CASSANDRA-13646)
  
 -3.0.14
 +3.11.0
 + * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606)
 + * Replace string comparison with regex/number checks in MessagingService 
test (CASSANDRA-13216)
 + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549)
 + * Fix the problem with duplicated rows when using paging with SASI 
(CASSANDRA-13302)
 + * Allow CONTAINS statements filtering on the partition key and it’s parts 
(CASSANDRA-13275)
 + * Fall back to even ranges calculation in clusters with vnodes when tokens 
are distributed unevenly (CASSANDRA-13229)
 + * Fix duration type validation to prevent overflow (CASSANDRA-13218)
 + * Forbid unsupported creation of SASI indexes over partition key columns 
(CASSANDRA-13228)
 + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369)
 + * UDA fails without input rows (CASSANDRA-13399)
 + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188)
 + * V5 protocol flags decoding broken (CASSANDRA-13443)
 + * Use write lock not read lock for removing sstables from compaction 
strategies. (CASSANDRA-13422)
 + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors 
(CASSANDRA-13329)
 + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962)
 + * Add charset to Analyser input stream (CASSANDRA-13151)
 + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820)
 + * cdc column addition strikes again (CASSANDRA-13382)
 + * Fix static column indexes (CASSANDRA-13277)
 + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298)
 + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370)
 + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns 
(CASSANDRA-13247)
 + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern 
(CASSANDRA-13317)
 + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound 
(CASSANDRA-13366)
 + * Support unaligned memory access for AArch64 (CASSANDRA-13326)
 + * Improve SASI range iterator efficiency on intersection with an empty range 
(CASSANDRA-12915).
 + * Fix equality comparisons of columns using the duration type 
(CASSANDRA-13174)
 + * Obfuscate password in stress-graphs (CASSANDRA-12233)
 + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034)
 + * nodetool stopdaemon errors out (CASSANDRA-13030)
 + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954)
 + * Fix primary index calculation for SASI (CASSANDRA-12910)
 + * More fixes to the TokenAllocator (CASSANDRA-12990)
 + * NoReplicationTokenAllocator should work with zero replication factor 
(CASSANDRA-12983)
 + * Address message coalescing regression (CASSANDRA-12676)
 + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417)
 + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307)
 + * Tracing payload not passed from QueryMessage to tracing session 
(CASSANDRA-12835)
 +Merged from 3.0:
   * Ensure int overflow doesn't occur when calculating large partition warning 
size (CASSANDRA-13172)
   * Ensure consistent view of partition columns between coordinator and 
replica in ColumnFilter (CASSANDRA-13004)
   * Failed unregistering mbean during drop keyspace (CASSANDRA-13346)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 7d72212,319eeb4..11444db
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@@ -479,6 -462,6 +479,7 @@@ public class SinglePartitionReadComman
  
                      // We want to cache only rowsToCache rows
                      CachedPartition toCache = 
CachedBTreePartition.create(toCacheIterator, nowInSec());
++
                      if (sentinelSuccess && !toCache.isEmpty())
                      {
                          Tracing.trace("Caching {} rows", toCache.rowCount());
@@@ -1047,47 -1015,32 +1057,55 @@@
              return commands.get(0).metadata();
          }
  
+         @Override
+         public boolean selectsFullPartition()
+         {
+             return selectsFullPartitions;
+         }
+ 
 -        public ReadOrderGroup startOrderGroup()
 +        public ReadExecutionController executionController()
          {
              // Note that the only difference between the command in a group 
must be the partition key on which
              // they applied. So as far as ReadOrderGroup is concerned, we can 
use any of the commands to start one.
 -            return commands.get(0).startOrderGroup();
 +            return commands.get(0).executionController();
          }
  
 -        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        public PartitionIterator executeInternal(ReadExecutionController 
controller)
          {
-             return 
limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, 
false), nowInSec), nowInSec);
 -            List<PartitionIterator> partitions = new 
ArrayList<>(commands.size());
 -            for (SinglePartitionReadCommand cmd : commands)
 -                partitions.add(cmd.executeInternal(orderGroup));
 -
 -            // Because we only have enforce the limit per command, we need to 
enforce it globally.
 -            return limits.filter(PartitionIterators.concat(partitions),
++            return 
limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, 
false), nowInSec),
+                                  nowInSec,
+                                  selectsFullPartitions);
          }
  
 -        public QueryPager getPager(PagingState pagingState, int 
protocolVersion)
 +        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
 +        {
 +            return executeLocally(executionController, true);
 +        }
 +
 +        /**
 +         * Implementation of {@link 
ReadQuery#executeLocally(ReadExecutionController)}.
 +         *
 +         * @param executionController - the {@code ReadExecutionController} 
protecting the read.
 +         * @param sort - whether to sort the inner commands by partition key, 
required for merging the iterator
 +         *               later on. This will be false when called by {@link 
ReadQuery#executeInternal(ReadExecutionController)}
 +         *               because in this case it is safe to do so as there is 
no merging involved and we don't want to
 +         *               change the old behavior which was to not sort by 
partition.
 +         *
 +         * @return - the iterator that can be used to retrieve the query 
result.
 +         */
 +        private UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController, boolean sort)
 +        {
 +            List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions 
= new ArrayList<>(commands.size());
 +            for (SinglePartitionReadCommand cmd : commands)
 +                partitions.add(Pair.of(cmd.partitionKey, 
cmd.executeLocally(executionController)));
 +
 +            if (sort)
 +                Collections.sort(partitions, (p1, p2) -> 
p1.getLeft().compareTo(p2.getLeft()));
 +
 +            return 
UnfilteredPartitionIterators.concat(partitions.stream().map(p -> 
p.getRight()).collect(Collectors.toList()));
 +        }
 +
 +        public QueryPager getPager(PagingState pagingState, ProtocolVersion 
protocolVersion)
          {
              if (commands.size() == 1)
                  return SinglePartitionReadCommand.getPager(commands.get(0), 
pagingState, protocolVersion);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/filter/DataLimits.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java
index abe029b,48ec06a..0c8cd37
--- a/src/java/org/apache/cassandra/db/filter/DataLimits.java
+++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java
@@@ -135,18 -109,7 +139,18 @@@ public abstract class DataLimit
  
      public abstract DataLimits forShortReadRetry(int toFetch);
  
 +    /**
 +     * Creates a <code>DataLimits</code> instance to be used for paginating 
internally GROUP BY queries.
 +     *
 +     * @param state the <code>GroupMaker</code> state
 +     * @return a <code>DataLimits</code> instance to be used for paginating 
internally GROUP BY queries
 +     */
 +    public DataLimits forGroupByInternalPaging(GroupingState state)
 +    {
 +        throw new UnsupportedOperationException();
 +    }
 +
-     public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec);
+     public abstract boolean hasEnoughLiveData(CachedPartition cached, int 
nowInSec, boolean countPartitionsWithOnlyStaticData);
  
      /**
       * Returns a new {@code Counter} for this limits.
@@@ -171,25 -136,23 +177,29 @@@
  
      public abstract int perPartitionCount();
  
 +    /**
 +     * Returns equivalent limits but where any internal state kept to track 
where we are of paging and/or grouping is
 +     * discarded.
 +     */
 +    public abstract DataLimits withoutState();
 +
-     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
+     public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter,
+                                               int nowInSec,
+                                               boolean 
countPartitionsWithOnlyStaticData)
      {
-         return this.newCounter(nowInSec, false).applyTo(iter);
+         return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
      }
  
-     public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int 
nowInSec)
+     public UnfilteredRowIterator filter(UnfilteredRowIterator iter,
+                                         int nowInSec,
+                                         boolean 
countPartitionsWithOnlyStaticData)
      {
-         return this.newCounter(nowInSec, false).applyTo(iter);
+         return this.newCounter(nowInSec, false, 
countPartitionsWithOnlyStaticData).applyTo(iter);
      }
  
-     public PartitionIterator filter(PartitionIterator iter, int nowInSec)
+     public PartitionIterator filter(PartitionIterator iter, int nowInSec, 
boolean countPartitionsWithOnlyStaticData)
      {
-         return this.newCounter(nowInSec, true).applyTo(iter);
+         return this.newCounter(nowInSec, true, 
countPartitionsWithOnlyStaticData).applyTo(iter);
      }
  
      /**
@@@ -429,14 -352,20 +439,16 @@@
  
          protected class CQLCounter extends Counter
          {
 -            protected final int nowInSec;
 -            protected final boolean assumeLiveData;
 -            protected final boolean countPartitionsWithOnlyStaticData;
 -
              protected int rowCounted;
              protected int rowInCurrentPartition;
++            protected final boolean countPartitionsWithOnlyStaticData;
  
              protected boolean hasLiveStaticRow;
  
-             public CQLCounter(int nowInSec, boolean assumeLiveData)
+             public CQLCounter(int nowInSec, boolean assumeLiveData, boolean 
countPartitionsWithOnlyStaticData)
              {
 -                this.nowInSec = nowInSec;
 -                this.assumeLiveData = assumeLiveData;
 +                super(nowInSec, assumeLiveData);
+                 this.countPartitionsWithOnlyStaticData = 
countPartitionsWithOnlyStaticData;
              }
  
              @Override
@@@ -554,15 -473,9 +566,15 @@@
          }
  
          @Override
 +        public DataLimits withoutState()
 +        {
 +            return new CQLLimits(rowLimit, perPartitionLimit, isDistinct);
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
+         public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
          {
-             return new PagingAwareCounter(nowInSec, assumeLiveData);
+             return new PagingAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
          }
  
          private class PagingAwareCounter extends CQLCounter
@@@ -593,499 -506,6 +605,502 @@@
      }
  
      /**
 +     * <code>CQLLimits</code> used for GROUP BY queries or queries with 
aggregates.
 +     * <p>Internally, GROUP BY queries are always paginated by number of rows 
to avoid OOMExceptions. By consequence,
 +     * the limits keep track of the number of rows as well as the number of 
groups.</p>
 +     * <p>A group can only be counted if the next group or the end of the 
data is reached.</p>
 +     */
 +    private static class CQLGroupByLimits extends CQLLimits
 +    {
 +        /**
 +         * The <code>GroupMaker</code> state
 +         */
 +        protected final GroupingState state;
 +
 +        /**
 +         * The GROUP BY specification
 +         */
 +        protected final AggregationSpecification groupBySpec;
 +
 +        /**
 +         * The limit on the number of groups
 +         */
 +        protected final int groupLimit;
 +
 +        /**
 +         * The limit on the number of groups per partition
 +         */
 +        protected final int groupPerPartitionLimit;
 +
 +        public CQLGroupByLimits(int groupLimit,
 +                                int groupPerPartitionLimit,
 +                                int rowLimit,
 +                                AggregationSpecification groupBySpec)
 +        {
 +            this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, 
GroupingState.EMPTY_STATE);
 +        }
 +
 +        private CQLGroupByLimits(int groupLimit,
 +                                 int groupPerPartitionLimit,
 +                                 int rowLimit,
 +                                 AggregationSpecification groupBySpec,
 +                                 GroupingState state)
 +        {
 +            super(rowLimit, NO_LIMIT, false);
 +            this.groupLimit = groupLimit;
 +            this.groupPerPartitionLimit = groupPerPartitionLimit;
 +            this.groupBySpec = groupBySpec;
 +            this.state = state;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_GROUP_BY_LIMIT;
 +        }
 +
 +        @Override
 +        public boolean isGroupByLimit()
 +        {
 +            return true;
 +        }
 +
 +        public boolean isUnlimited()
 +        {
 +            return groupLimit == NO_LIMIT && groupPerPartitionLimit == 
NO_LIMIT && rowLimit == NO_LIMIT;
 +        }
 +
 +        public DataLimits forShortReadRetry(int toFetch)
 +        {
 +            return new CQLLimits(toFetch);
 +        }
 +
 +        @Override
 +        public float estimateTotalResults(ColumnFamilyStore cfs)
 +        {
 +            // For the moment, we return the estimated number of rows as we 
have no good way of estimating 
 +            // the number of groups that will be returned. Hopefully, we 
should be able to fix
 +            // that problem at some point.
 +            return super.estimateTotalResults(cfs);
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            return new CQLGroupByLimits(pageSize,
 +                                        groupPerPartitionLimit,
 +                                        rowLimit,
 +                                        groupBySpec,
 +                                        state);
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
 +        {
 +            return new CQLGroupByPagingLimits(pageSize,
 +                                              groupPerPartitionLimit,
 +                                              rowLimit,
 +                                              groupBySpec,
 +                                              state,
 +                                              lastReturnedKey,
 +                                              lastReturnedKeyRemaining);
 +        }
 +
 +        @Override
 +        public DataLimits forGroupByInternalPaging(GroupingState state)
 +        {
 +            return new CQLGroupByLimits(rowLimit,
 +                                        groupPerPartitionLimit,
 +                                        rowLimit,
 +                                        groupBySpec,
 +                                        state);
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
-             return new GroupByAwareCounter(nowInSec, assumeLiveData);
++            return new GroupByAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +        }
 +
 +        @Override
 +        public int count()
 +        {
 +            return groupLimit;
 +        }
 +
 +        @Override
 +        public int perPartitionCount()
 +        {
 +            return groupPerPartitionLimit;
 +        }
 +
 +        @Override
 +        public DataLimits withoutState()
 +        {
 +            return state == GroupingState.EMPTY_STATE
 +                 ? this
 +                 : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, 
rowLimit, groupBySpec);
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +
 +            if (groupLimit != NO_LIMIT)
 +            {
 +                sb.append("GROUP LIMIT ").append(groupLimit);
 +                if (groupPerPartitionLimit != NO_LIMIT || rowLimit != 
NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (groupPerPartitionLimit != NO_LIMIT)
 +            {
 +                sb.append("GROUP PER PARTITION LIMIT 
").append(groupPerPartitionLimit);
 +                if (rowLimit != NO_LIMIT)
 +                    sb.append(' ');
 +            }
 +
 +            if (rowLimit != NO_LIMIT)
 +            {
 +                sb.append("LIMIT ").append(rowLimit);
 +            }
 +
 +            return sb.toString();
 +        }
 +
 +        @Override
 +        public boolean isExhausted(Counter counter)
 +        {
 +            return ((GroupByAwareCounter) counter).rowCounted < rowLimit
 +                    && counter.counted() < groupLimit;
 +        }
 +
 +        protected class GroupByAwareCounter extends Counter
 +        {
 +            private final GroupMaker groupMaker;
 +
++            protected final boolean countPartitionsWithOnlyStaticData;
++
 +            /**
 +             * The key of the partition being processed.
 +             */
 +            protected DecoratedKey currentPartitionKey;
 +
 +            /**
 +             * The number of rows counted so far.
 +             */
 +            protected int rowCounted;
 +
 +            /**
 +             * The number of rows counted so far in the current partition.
 +             */
 +            protected int rowCountedInCurrentPartition;
 +
 +            /**
 +             * The number of groups counted so far. A group is counted only 
once it is complete
 +             * (e.g the next one has been reached).
 +             */
 +            protected int groupCounted;
 +
 +            /**
 +             * The number of groups in the current partition.
 +             */
 +            protected int groupInCurrentPartition;
 +
 +            protected boolean hasGroupStarted;
 +
 +            protected boolean hasLiveStaticRow;
 +
 +            protected boolean hasReturnedRowsFromCurrentPartition;
 +
-             private GroupByAwareCounter(int nowInSec, boolean assumeLiveData)
++            private GroupByAwareCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +            {
 +                super(nowInSec, assumeLiveData);
 +                this.groupMaker = groupBySpec.newGroupMaker(state);
++                this.countPartitionsWithOnlyStaticData = 
countPartitionsWithOnlyStaticData;
 +
 +                // If the end of the partition was reached at the same time 
than the row limit, the last group might
 +                // not have been counted yet. Due to that we need to guess, 
based on the state, if the previous group
 +                // is still open.
 +                hasGroupStarted = state.hasClustering();
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
 +            {
 +                if (partitionKey.getKey().equals(state.partitionKey()))
 +                {
 +                    // The only case were we could have state.partitionKey() 
equals to the partition key
 +                    // is if some of the partition rows have been returned in 
the previous page but the
 +                    // partition was not exhausted (as the state partition 
key has not been updated yet).
 +                    // Since we know we have returned rows, we know we have 
accounted for
 +                    // the static row if any already, so force 
hasLiveStaticRow to false so we make sure to not count it
 +                    // once more.
 +                    hasLiveStaticRow = false;
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                    hasGroupStarted = true;
 +                }
 +                else
 +                {
 +                    // We need to increment our count of groups if we have 
reached a new one and unless we had no new
 +                    // content added since we closed our last group (that is, 
if hasGroupStarted). Note that we may get
 +                    // here with hasGroupStarted == false in the following 
cases:
 +                    // * the partition limit was reached for the previous 
partition
 +                    // * the previous partition was containing only one 
static row
 +                    // * the rows of the last group of the previous partition 
were all marked as deleted
 +                    if (hasGroupStarted && 
groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING))
 +                    {
 +                        incrementGroupCount();
 +                        // If we detect, before starting the new partition, 
that we are done, we need to increase
 +                        // the per partition group count of the previous 
partition as the next page will start from
 +                        // there.
 +                        if (isDone())
 +                            incrementGroupInCurrentPartitionCount();
 +                        hasGroupStarted = false;
 +                    }
 +                    hasReturnedRowsFromCurrentPartition = false;
 +                    hasLiveStaticRow = !staticRow.isEmpty() && 
isLive(staticRow);
 +                }
 +                currentPartitionKey = partitionKey;
 +                // If we are done we need to preserve the 
groupInCurrentPartition and rowCountedInCurrentPartition
 +                // because the pager need to retrieve the count associated to 
the last value it has returned.
 +                if (!isDone())
 +                {
 +                    groupInCurrentPartition = 0;
 +                    rowCountedInCurrentPartition = 0;
 +                }
 +            }
 +
 +            @Override
 +            protected Row applyToStatic(Row row)
 +            {
 +                // It's possible that we're "done" if the partition we just 
started bumped the number of groups (in
 +                // applyToPartition() above), in which case Transformation 
will still call this method. In that case, we
 +                // want to ignore the static row, it should (and will) be 
returned with the next page/group if needs be.
 +                if (isDone())
 +                {
 +                    hasLiveStaticRow = false; // The row has not been returned
 +                    return Rows.EMPTY_STATIC_ROW;
 +                }
 +                return row;
 +            }
 +
 +            @Override
 +            public Row applyToRow(Row row)
 +            {
 +                // We want to check if the row belongs to a new group even if 
it has been deleted. The goal being
 +                // to minimize the chances of having to go through the same 
data twice if we detect on the next
 +                // non deleted row that we have reached the limit.
 +                if (groupMaker.isNewGroup(currentPartitionKey, 
row.clustering()))
 +                {
 +                    if (hasGroupStarted)
 +                    {
 +                        incrementGroupCount();
 +                        incrementGroupInCurrentPartitionCount();
 +                    }
 +                    hasGroupStarted = false;
 +                }
 +
 +                // That row may have made us increment the group count, which 
may mean we're done for this partition, in
 +                // which case we shouldn't count this row (it won't be 
returned).
 +                if (isDoneForPartition())
 +                {
 +                    hasGroupStarted = false;
 +                    return null;
 +                }
 +
 +                if (isLive(row))
 +                {
 +                    hasGroupStarted = true;
 +                    incrementRowCount();
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                }
 +
 +                return row;
 +            }
 +
 +            @Override
 +            public int counted()
 +            {
 +                return groupCounted;
 +            }
 +
 +            @Override
 +            public int countedInCurrentPartition()
 +            {
 +                return groupInCurrentPartition;
 +            }
 +
 +            @Override
 +            public int rowCounted()
 +            {
 +                return rowCounted;
 +            }
 +
 +            @Override
 +            public int rowCountedInCurrentPartition()
 +            {
 +                return rowCountedInCurrentPartition;
 +            }
 +
 +            protected void incrementRowCount()
 +            {
 +                rowCountedInCurrentPartition++;
 +                if (++rowCounted >= rowLimit)
 +                    stop();
 +            }
 +
 +            private void incrementGroupCount()
 +            {
 +                groupCounted++;
 +                if (groupCounted >= groupLimit)
 +                    stop();
 +            }
 +
 +            private void incrementGroupInCurrentPartitionCount()
 +            {
 +                groupInCurrentPartition++;
 +                if (groupInCurrentPartition >= groupPerPartitionLimit)
 +                    stopInPartition();
 +            }
 +
 +            @Override
 +            public boolean isDoneForPartition()
 +            {
 +                return isDone() || groupInCurrentPartition >= 
groupPerPartitionLimit;
 +            }
 +
 +            @Override
 +            public boolean isDone()
 +            {
 +                return groupCounted >= groupLimit;
 +            }
 +
 +            @Override
 +            public void onPartitionClose()
 +            {
 +                // Normally, we don't count static rows as from a CQL point 
of view, it will be merge with other
 +                // rows in the partition. However, if we only have the static 
row, it will be returned as one group
 +                // so count it.
-                 if (hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition)
++                if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && 
!hasReturnedRowsFromCurrentPartition)
 +                {
 +                    incrementRowCount();
 +                    incrementGroupCount();
 +                    incrementGroupInCurrentPartitionCount();
 +                    hasGroupStarted = false;
 +                }
 +                super.onPartitionClose();
 +            }
 +
 +            @Override
 +            public void onClose()
 +            {
 +                // Groups are only counted when the end of the group is 
reached.
 +                // The end of a group is detected by 2 ways:
 +                // 1) a new group is reached
 +                // 2) the end of the data is reached
 +                // We know that the end of the data is reached if the group 
limit has not been reached
 +                // and the number of rows counted is smaller than the 
internal page size.
 +                if (hasGroupStarted && groupCounted < groupLimit && 
rowCounted < rowLimit)
 +                {
 +                    incrementGroupCount();
 +                    incrementGroupInCurrentPartitionCount();
 +                }
 +
 +                super.onClose();
 +            }
 +        }
 +    }
 +
 +    private static class CQLGroupByPagingLimits extends CQLGroupByLimits
 +    {
 +        private final ByteBuffer lastReturnedKey;
 +
 +        private final int lastReturnedKeyRemaining;
 +
 +        public CQLGroupByPagingLimits(int groupLimit,
 +                                      int groupPerPartitionLimit,
 +                                      int rowLimit,
 +                                      AggregationSpecification groupBySpec,
 +                                      GroupingState state,
 +                                      ByteBuffer lastReturnedKey,
 +                                      int lastReturnedKeyRemaining)
 +        {
 +            super(groupLimit,
 +                  groupPerPartitionLimit,
 +                  rowLimit,
 +                  groupBySpec,
 +                  state);
 +
 +            this.lastReturnedKey = lastReturnedKey;
 +            this.lastReturnedKeyRemaining = lastReturnedKeyRemaining;
 +        }
 +
 +        @Override
 +        public Kind kind()
 +        {
 +            return Kind.CQL_GROUP_BY_PAGING_LIMIT;
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, 
int lastReturnedKeyRemaining)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
 +        public DataLimits forGroupByInternalPaging(GroupingState state)
 +        {
 +            throw new UnsupportedOperationException();
 +        }
 +
 +        @Override
-         public Counter newCounter(int nowInSec, boolean assumeLiveData)
++        public Counter newCounter(int nowInSec, boolean assumeLiveData, 
boolean countPartitionsWithOnlyStaticData)
 +        {
 +            assert state == GroupingState.EMPTY_STATE || 
lastReturnedKey.equals(state.partitionKey());
-             return new PagingGroupByAwareCounter(nowInSec, assumeLiveData);
++            return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +        }
 +
 +        @Override
 +        public DataLimits withoutState()
 +        {
 +            return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, 
rowLimit, groupBySpec);
 +        }
 +
 +        private class PagingGroupByAwareCounter extends GroupByAwareCounter
 +        {
-             private PagingGroupByAwareCounter(int nowInSec, boolean 
assumeLiveData)
++            private PagingGroupByAwareCounter(int nowInSec, boolean 
assumeLiveData, boolean countPartitionsWithOnlyStaticData)
 +            {
-                 super(nowInSec, assumeLiveData);
++                super(nowInSec, assumeLiveData, 
countPartitionsWithOnlyStaticData);
 +            }
 +
 +            @Override
 +            public void applyToPartition(DecoratedKey partitionKey, Row 
staticRow)
 +            {
 +                if (partitionKey.getKey().equals(lastReturnedKey))
 +                {
 +                    currentPartitionKey = partitionKey;
 +                    groupInCurrentPartition = groupPerPartitionLimit - 
lastReturnedKeyRemaining;
 +                    hasReturnedRowsFromCurrentPartition = true;
 +                    hasLiveStaticRow = false;
 +                    hasGroupStarted = state.hasClustering();
 +                }
 +                else
 +                {
 +                    super.applyToPartition(partitionKey, staticRow);
 +                }
 +            }
 +        }
 +    }
 +
 +    /**
       * Limits used by thrift; this count partition and cells.
       */
      private static class ThriftLimits extends DataLimits

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/filter/RowFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CacheService.java
index a4e18c0,d23bdb0..e209450
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@@ -439,10 -441,9 +439,10 @@@ public class CacheService implements Ca
                  {
                      DecoratedKey key = cfs.decorateKey(buffer);
                      int nowInSec = FBUtilities.nowInSeconds();
 -                    try (OpOrder.Group op = cfs.readOrdering.start(); 
UnfilteredRowIterator iter = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, 
key).queryMemtableAndDisk(cfs, op))
 +                    SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key);
 +                    try (ReadExecutionController controller = 
cmd.executionController(); UnfilteredRowIterator iter = 
cmd.queryMemtableAndDisk(cfs, controller))
                      {
-                         CachedPartition toCache = 
CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, 
nowInSec), nowInSec);
+                         CachedPartition toCache = 
CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, 
nowInSec, true), nowInSec);
                          return Pair.create(new 
RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache);
                      }
                  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index f1c4dd1,c96a893..0dd1e7e
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -406,14 -390,12 +406,14 @@@ public class DataResolver extends Respo
          private final InetAddress source;
          private final DataLimits.Counter counter;
          private final DataLimits.Counter postReconciliationCounter;
 +        private final long queryStartNanoTime;
  
 -        private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter)
 +        private ShortReadProtection(InetAddress source, DataLimits.Counter 
postReconciliationCounter, long queryStartNanoTime)
          {
              this.source = source;
-             this.counter = command.limits().newCounter(command.nowInSec(), 
false).onlyCount();
+             this.counter = command.limits().newCounter(command.nowInSec(), 
false, command.selectsFullPartition()).onlyCount();
              this.postReconciliationCounter = postReconciliationCounter;
 +            this.queryStartNanoTime = queryStartNanoTime;
          }
  
          @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 0da25a9,6610cf7..b8f87d9
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -2281,7 -2164,9 +2281,9 @@@ public class StorageProxy implements St
  
          // Note that in general, a RangeCommandIterator will honor the 
command limit for each range, but will not enforce it globally.
  
-         return 
command.limits().filter(command.postReconciliationProcessing(new 
RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, 
consistencyLevel, queryStartNanoTime)), command.nowInSec());
 -        return 
command.limits().filter(command.postReconciliationProcessing(new 
RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, 
consistencyLevel)),
++        return 
command.limits().filter(command.postReconciliationProcessing(new 
RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, 
consistencyLevel, queryStartNanoTime)),
+                                        command.nowInSec(),
+                                        command.selectsFullPartition());
      }
  
      public Map<String, List<String>> getSchemaVersions()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index da388d0,11bbc0e..9dae11c
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@@ -84,27 -84,6 +85,31 @@@ public class MultiPartitionPager implem
          remaining = state == null ? limit.count() : state.remaining;
      }
  
-     private MultiPartitionPager(SinglePartitionPager[] pagers, DataLimits 
limit, int nowInSec, int remaining, int current)
++    private MultiPartitionPager(SinglePartitionPager[] pagers,
++                                DataLimits limit,
++                                int nowInSec,
++                                int remaining,
++                                int current)
 +    {
 +        this.pagers = pagers;
 +        this.limit = limit;
 +        this.nowInSec = nowInSec;
 +        this.remaining = remaining;
 +        this.current = current;
 +    }
 +
 +    public QueryPager withUpdatedLimit(DataLimits newLimits)
 +    {
 +        SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, 
pagers.length);
 +        newPagers[current] = newPagers[current].withUpdatedLimit(newLimits);
 +
 +        return new MultiPartitionPager(newPagers,
 +                                       newLimits,
 +                                       nowInSec,
 +                                       remaining,
 +                                       current);
 +    }
 +
      public PagingState state()
      {
          // Sets current to the first non-exhausted pager

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 15311ab,c26bf3f..1b56417
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@@ -54,9 -53,9 +54,9 @@@ public class QueryPager
          int count = 0;
          while (!pager.isExhausted())
          {
 -            try (PartitionIterator iter = pager.fetchPage(pageSize, 
consistencyLevel, state))
 +            try (PartitionIterator iter = pager.fetchPage(pageSize, 
consistencyLevel, state, queryStartNanoTime))
              {
-                 DataLimits.Counter counter = limits.newCounter(nowInSec, 
true);
+                 DataLimits.Counter counter = limits.newCounter(nowInSec, 
true, command.selectsFullPartition());
                  PartitionIterators.consume(counter.applyTo(iter));
                  count += counter.counted();
              }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 2dbcfff,ba23c67..2542db0
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -724,17 -719,17 +724,17 @@@ public abstract class CQLTeste
          return sessionNet(protocolVersion).execute(formatQuery(query), 
values);
      }
  
-     protected Session sessionNet()
+     protected com.datastax.driver.core.ResultSet executeNetWithPaging(String 
query, int pageSize) throws Throwable
      {
-         return sessionNet(getDefaultVersion());
+         return sessionNet().execute(new 
SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
      }
  
-     protected com.datastax.driver.core.ResultSet executeNetWithPaging(String 
query, int pageSize) throws Throwable
+     protected Session sessionNet()
      {
-         return sessionNet().execute(new 
SimpleStatement(formatQuery(query)).setFetchSize(pageSize));
 -        return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 
1));
++        return sessionNet(getDefaultVersion());
      }
  
 -    protected Session sessionNet(int protocolVersion)
 +    protected Session sessionNet(ProtocolVersion protocolVersion)
      {
          requireNetwork();
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
----------------------------------------------------------------------
diff --cc 
test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
index 21c48dd,7e90c0a..2442c2b
--- 
a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java
@@@ -275,4 -165,257 +277,281 @@@ public class SelectLimitTest extends CQ
                     row(1, -1, 1, 1),
                     row(3, -1, 1, 1));
      }
+ 
+     @Test
+     public void testFilteringOnClusteringColumnsWithLimitAndStaticColumns() 
throws Throwable
+     {
+         // With only one clustering column
+         createTable("CREATE TABLE %s (a int, b int, s int static, c int, 
primary key (a, b))"
 -          + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}");
++             + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 
'ALL'}");
+ 
+         for (int i = 0; i < 4; i++)
+         {
+             execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i);
+                 for (int j = 0; j < 3; j++)
+                     if (!((i == 0 || i == 3) && j == 1))
+                         execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 
i, j, i + j);
+         }
+ 
+         beforeAndAfterFlush(() ->
+         {
+             assertRows(execute("SELECT * FROM %s"),
+                        row(0, 0, 0, 0),
+                        row(0, 2, 0, 2),
+                        row(1, 0, 1, 1),
+                        row(1, 1, 1, 2),
+                        row(1, 2, 1, 3),
+                        row(2, 0, 2, 2),
+                        row(2, 1, 2, 3),
+                        row(2, 2, 2, 4),
+                        row(3, 0, 3, 3),
+                        row(3, 2, 3, 5));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW 
FILTERING"),
+                        row(1, 1, 1, 2),
+                        row(2, 1, 2, 3));
+ 
+             // The problem was that the static row of the partition 0 used to 
be only filtered in SelectStatement and was
+             // by consequence counted as a row. In which case the query was 
returning one row less.
+             assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW 
FILTERING"),
+                        row(1, 1, 1, 2),
+                        row(2, 1, 2, 3));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 
LIMIT 2 ALLOW FILTERING"),
+                        row(1, 1, 1, 2),
+                        row(2, 1, 2, 3));
+ 
+             // Test with paging
+             for (int pageSize = 1; pageSize < 4; pageSize++)
+             {
+                 assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                               row(1, 1, 1, 2),
+                               row(2, 1, 2, 3));
+ 
+                 assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
>= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                               row(1, 1, 1, 2),
+                               row(2, 1, 2, 3));
++
++                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
= 1 GROUP BY a LIMIT 2 ALLOW FILTERING", pageSize),
++                              row(1, 1, 1, 2),
++                              row(2, 1, 2, 3));
++
++                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
>= 1 AND b <= 1 GROUP BY a LIMIT 2 ALLOW FILTERING", pageSize),
++                              row(1, 1, 1, 2),
++                              row(2, 1, 2, 3));
+             }
+         });
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 
1 LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 
1 AND b <= 1 LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 
1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 
1 AND b <= 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3)"); // Load all data 
in the row cache
+ 
+         // Partition range queries
+         assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW 
FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 
ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         // Multiple partitions queries
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2) AND b = 1 
LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2) AND b >= 1 
AND b <= 1 LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 2),
+                    row(2, 1, 2, 3));
+ 
+         // Test with paging
+         for (int pageSize = 1; pageSize < 4; pageSize++)
+         {
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 
LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 1, 2),
+                           row(2, 1, 2, 3));
+ 
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 
AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 1, 2),
+                           row(2, 1, 2, 3));
+         }
+ 
+         // With multiple clustering columns
+         createTable("CREATE TABLE %s (a int, b int, c int, s int static, d 
int, primary key (a, b, c))"
+            + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}");
+ 
+         for (int i = 0; i < 3; i++)
+         {
+             execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i);
+                 for (int j = 0; j < 3; j++)
+                     if (!(i == 0 && j == 1))
+                         execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, 
?)", i, j, j, i + j);
+         }
+ 
+         beforeAndAfterFlush(() ->
+         {
+             assertRows(execute("SELECT * FROM %s"),
+                        row(0, 0, 0, 0, 0),
+                        row(0, 2, 2, 0, 2),
+                        row(1, 0, 0, 1, 1),
+                        row(1, 1, 1, 1, 2),
+                        row(1, 2, 2, 1, 3),
+                        row(2, 0, 0, 2, 2),
+                        row(2, 1, 1, 2, 3),
+                        row(2, 2, 2, 2, 4));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW 
FILTERING"),
+                        row(1, 1, 1, 1, 2),
+                        row(2, 1, 1, 2, 3));
+ 
+             assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND 
c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"),
+                        row(1, 1, 1, 1, 2),
+                        row(2, 1, 1, 2, 3));
+ 
+             // Test with paging
+             for (int pageSize = 1; pageSize < 4; pageSize++)
+             {
+                 assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                               row(1, 1, 1, 1, 2),
+                               row(2, 1, 1, 2, 3));
+ 
+                 assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                               row(1, 1, 1, 1, 2),
+                               row(2, 1, 1, 2, 3));
++
++                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
= 1 GROUP BY a, b LIMIT 2 ALLOW FILTERING", pageSize),
++                              row(1, 1, 1, 1, 2),
++                              row(2, 1, 1, 2, 3));
++
++                assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b 
IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 GROUP BY a, b LIMIT 2 ALLOW FILTERING", 
pageSize),
++                              row(1, 1, 1, 1, 2),
++                              row(2, 1, 1, 2, 3));
+             }
+         });
+ 
+         execute("SELECT * FROM %s WHERE a IN (0, 1, 2)"); // Load data in the 
row cache
+ 
+         assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"),
+                    row(1, 1, 1, 1, 2),
+                    row(2, 1, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 
1 AND c <= 1 LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 1, 2),
+                    row(2, 1, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (1, 2, 3, 4) AND b = 
1 ALLOW FILTERING"),
+                    row(1, 1, 1, 1, 2),
+                    row(2, 1, 1, 2, 3));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE a IN (1, 2, 3, 4) AND b IN 
(1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"),
+                    row(1, 1, 1, 1, 2),
+                    row(2, 1, 1, 2, 3));
+ 
+         // Test with paging
+         for (int pageSize = 1; pageSize < 4; pageSize++)
+         {
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 
LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 1, 1, 2),
+                           row(2, 1, 1, 2, 3));
+ 
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN 
(1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 1, 1, 2),
+                           row(2, 1, 1, 2, 3));
+         }
+     }
+ 
+     @Test
+     public void testIndexOnRegularColumnWithPartitionWithoutRows() throws 
Throwable
+     {
+         createTable("CREATE TABLE %s (pk int, c int, s int static, v int, 
PRIMARY KEY(pk, c))");
+         createIndex("CREATE INDEX ON %s (v)");
+ 
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 
1);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 
2);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 
1);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 
1);
+         flush();
+ 
+         assertRows(execute("SELECT * FROM %s WHERE v = ?", 1),
+                    row(1, 1, 9, 1),
+                    row(3, 1, 9, 1),
+                    row(4, 1, 9, 1));
+ 
+         execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1);
+ 
+         // Test without paging
+         assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2", 1),
+                    row(1, 1, 9, 1),
+                    row(4, 1, 9, 1));
+ 
++        assertRows(execute("SELECT * FROM %s WHERE v = ? GROUP BY pk LIMIT 
2", 1),
++                   row(1, 1, 9, 1),
++                   row(4, 1, 9, 1));
++
+         // Test with paging
+         for (int pageSize = 1; pageSize < 4; pageSize++)
+         {
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 
LIMIT 2", pageSize),
+                           row(1, 1, 9, 1),
+                           row(4, 1, 9, 1));
++
++            assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 
GROUP BY pk LIMIT 2", pageSize),
++                          row(1, 1, 9, 1),
++                          row(4, 1, 9, 1));
+         }
+     }
+ 
+     @Test
+     public void testFilteringWithPartitionWithoutRows() throws Throwable
+     {
+         createTable("CREATE TABLE %s (pk int, c int, s int static, v int, 
PRIMARY KEY(pk, c))");
+ 
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 
1);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 
2);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 
1);
+         execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 
1);
+         flush();
+ 
+         assertRows(execute("SELECT * FROM %s WHERE v = ? ALLOW FILTERING", 1),
+                    row(1, 1, 9, 1),
+                    row(3, 1, 9, 1),
+                    row(4, 1, 9, 1));
+ 
+         execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1);
+ 
+         // Test without paging
+         assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2 ALLOW 
FILTERING", 1),
+                    row(1, 1, 9, 1),
+                    row(4, 1, 9, 1));
+ 
+         assertRows(execute("SELECT * FROM %s WHERE pk IN ? AND v = ? LIMIT 2 
ALLOW FILTERING", list(1, 3, 4), 1),
+                    row(1, 1, 9, 1),
+                    row(4, 1, 9, 1));
+ 
+         // Test with paging
+         for (int pageSize = 1; pageSize < 4; pageSize++)
+         {
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 
LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 9, 1),
+                           row(4, 1, 9, 1));
+ 
+             assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE pk IN 
(1, 3, 4) AND v = 1 LIMIT 2 ALLOW FILTERING", pageSize),
+                           row(1, 1, 9, 1),
+                           row(4, 1, 9, 1));
+         }
+     }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java
----------------------------------------------------------------------


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to