Merge branch cassandra-3.0 into cassandra-3.11
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7aa89a64 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7aa89a64 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7aa89a64 Branch: refs/heads/trunk Commit: 7aa89a64e09c57061418c1d83c03ae7cfd0cd745 Parents: bd89f56 e2445cf Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Jul 14 17:18:19 2017 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Jul 14 17:19:38 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 6 +- .../apache/cassandra/db/ColumnFamilyStore.java | 3 +- src/java/org/apache/cassandra/db/DataRange.java | 5 + .../cassandra/db/PartitionRangeReadCommand.java | 6 + .../org/apache/cassandra/db/ReadCommand.java | 2 +- src/java/org/apache/cassandra/db/ReadQuery.java | 12 + .../db/SinglePartitionReadCommand.java | 22 +- .../apache/cassandra/db/filter/DataLimits.java | 82 +++--- .../apache/cassandra/db/filter/RowFilter.java | 15 + .../apache/cassandra/service/CacheService.java | 2 +- .../apache/cassandra/service/DataResolver.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 8 +- .../service/pager/AbstractQueryPager.java | 2 +- .../service/pager/MultiPartitionPager.java | 9 +- .../cassandra/service/pager/QueryPagers.java | 2 +- .../org/apache/cassandra/cql3/CQLTester.java | 8 +- .../validation/operations/SelectLimitTest.java | 279 +++++++++++++++++++ .../db/rows/UnfilteredRowIteratorsTest.java | 10 +- 18 files changed, 418 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index e7ad6fb,c916452..edd66e2 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -6,53 -2,16 +6,53 @@@ Merged from 3.0 * Make concat work with iterators that have different subsets of columns (CASSANDRA-13482) * Set test.runners based on cores and memory size (CASSANDRA-13078) * Allow different NUMACTL_ARGS to be passed in (CASSANDRA-13557) - * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) * Fix secondary index queries on COMPACT tables (CASSANDRA-13627) * Nodetool listsnapshots output is missing a newline, if there are no snapshots (CASSANDRA-13568) - Merged from 2.2: - * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223) - * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) - * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) - * Fix nested Tuples/UDTs validation (CASSANDRA-13646) +Merged from 2.2: - * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) - * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) - * Fix nested Tuples/UDTs validation (CASSANDRA-13646) ++ * Fix queries with LIMIT and filtering on clustering columns (CASSANDRA-11223) ++ * Fix potential NPE when resume bootstrap fails (CASSANDRA-13272) ++ * Fix toJSONString for the UDT, tuple and collection types (CASSANDRA-13592) * Fix nested Tuples/UDTs validation (CASSANDRA-13646) -3.0.14 +3.11.0 + * Allow native function calls in CQLSSTableWriter (CASSANDRA-12606) + * Replace string comparison with regex/number checks in MessagingService test (CASSANDRA-13216) + * Fix formatting of duration columns in CQLSH (CASSANDRA-13549) + * Fix the problem with duplicated rows when using paging with SASI (CASSANDRA-13302) + * Allow CONTAINS statements filtering on the partition key and itâs parts (CASSANDRA-13275) + * Fall back to even ranges calculation in clusters with vnodes when tokens are distributed unevenly (CASSANDRA-13229) + * Fix duration type validation to prevent overflow (CASSANDRA-13218) + * Forbid unsupported creation of SASI indexes over partition key columns (CASSANDRA-13228) + * Reject multiple values for a key in CQL grammar. (CASSANDRA-13369) + * UDA fails without input rows (CASSANDRA-13399) + * Fix compaction-stress by using daemonInitialization (CASSANDRA-13188) + * V5 protocol flags decoding broken (CASSANDRA-13443) + * Use write lock not read lock for removing sstables from compaction strategies. (CASSANDRA-13422) + * Use corePoolSize equal to maxPoolSize in JMXEnabledThreadPoolExecutors (CASSANDRA-13329) + * Avoid rebuilding SASI indexes containing no values (CASSANDRA-12962) + * Add charset to Analyser input stream (CASSANDRA-13151) + * Fix testLimitSSTables flake caused by concurrent flush (CASSANDRA-12820) + * cdc column addition strikes again (CASSANDRA-13382) + * Fix static column indexes (CASSANDRA-13277) + * DataOutputBuffer.asNewBuffer broken (CASSANDRA-13298) + * unittest CipherFactoryTest failed on MacOS (CASSANDRA-13370) + * Forbid SELECT restrictions and CREATE INDEX over non-frozen UDT columns (CASSANDRA-13247) + * Default logging we ship will incorrectly print "?:?" for "%F:%L" pattern (CASSANDRA-13317) + * Possible AssertionError in UnfilteredRowIteratorWithLowerBound (CASSANDRA-13366) + * Support unaligned memory access for AArch64 (CASSANDRA-13326) + * Improve SASI range iterator efficiency on intersection with an empty range (CASSANDRA-12915). + * Fix equality comparisons of columns using the duration type (CASSANDRA-13174) + * Obfuscate password in stress-graphs (CASSANDRA-12233) + * Move to FastThreadLocalThread and FastThreadLocal (CASSANDRA-13034) + * nodetool stopdaemon errors out (CASSANDRA-13030) + * Tables in system_distributed should not use gcgs of 0 (CASSANDRA-12954) + * Fix primary index calculation for SASI (CASSANDRA-12910) + * More fixes to the TokenAllocator (CASSANDRA-12990) + * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) + * Address message coalescing regression (CASSANDRA-12676) + * Delete illegal character from StandardTokenizerImpl.jflex (CASSANDRA-13417) + * Fix cqlsh automatic protocol downgrade regression (CASSANDRA-13307) + * Tracing payload not passed from QueryMessage to tracing session (CASSANDRA-12835) +Merged from 3.0: * Ensure int overflow doesn't occur when calculating large partition warning size (CASSANDRA-13172) * Ensure consistent view of partition columns between coordinator and replica in ColumnFilter (CASSANDRA-13004) * Failed unregistering mbean during drop keyspace (CASSANDRA-13346) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/ReadQuery.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 7d72212,319eeb4..11444db --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -479,6 -462,6 +479,7 @@@ public class SinglePartitionReadComman // We want to cache only rowsToCache rows CachedPartition toCache = CachedBTreePartition.create(toCacheIterator, nowInSec()); ++ if (sentinelSuccess && !toCache.isEmpty()) { Tracing.trace("Caching {} rows", toCache.rowCount()); @@@ -1047,47 -1015,32 +1057,55 @@@ return commands.get(0).metadata(); } + @Override + public boolean selectsFullPartition() + { + return selectsFullPartitions; + } + - public ReadOrderGroup startOrderGroup() + public ReadExecutionController executionController() { // Note that the only difference between the command in a group must be the partition key on which // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. - return commands.get(0).startOrderGroup(); + return commands.get(0).executionController(); } - public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + public PartitionIterator executeInternal(ReadExecutionController controller) { - return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec), nowInSec); - List<PartitionIterator> partitions = new ArrayList<>(commands.size()); - for (SinglePartitionReadCommand cmd : commands) - partitions.add(cmd.executeInternal(orderGroup)); - - // Because we only have enforce the limit per command, we need to enforce it globally. - return limits.filter(PartitionIterators.concat(partitions), ++ return limits.filter(UnfilteredPartitionIterators.filter(executeLocally(controller, false), nowInSec), + nowInSec, + selectsFullPartitions); } - public QueryPager getPager(PagingState pagingState, int protocolVersion) + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return executeLocally(executionController, true); + } + + /** + * Implementation of {@link ReadQuery#executeLocally(ReadExecutionController)}. + * + * @param executionController - the {@code ReadExecutionController} protecting the read. + * @param sort - whether to sort the inner commands by partition key, required for merging the iterator + * later on. This will be false when called by {@link ReadQuery#executeInternal(ReadExecutionController)} + * because in this case it is safe to do so as there is no merging involved and we don't want to + * change the old behavior which was to not sort by partition. + * + * @return - the iterator that can be used to retrieve the query result. + */ + private UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController, boolean sort) + { + List<Pair<DecoratedKey, UnfilteredPartitionIterator>> partitions = new ArrayList<>(commands.size()); + for (SinglePartitionReadCommand cmd : commands) + partitions.add(Pair.of(cmd.partitionKey, cmd.executeLocally(executionController))); + + if (sort) + Collections.sort(partitions, (p1, p2) -> p1.getLeft().compareTo(p2.getLeft())); + + return UnfilteredPartitionIterators.concat(partitions.stream().map(p -> p.getRight()).collect(Collectors.toList())); + } + + public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) { if (commands.size() == 1) return SinglePartitionReadCommand.getPager(commands.get(0), pagingState, protocolVersion); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/filter/DataLimits.java index abe029b,48ec06a..0c8cd37 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@@ -135,18 -109,7 +139,18 @@@ public abstract class DataLimit public abstract DataLimits forShortReadRetry(int toFetch); + /** + * Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries. + * + * @param state the <code>GroupMaker</code> state + * @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries + */ + public DataLimits forGroupByInternalPaging(GroupingState state) + { + throw new UnsupportedOperationException(); + } + - public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec); + public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData); /** * Returns a new {@code Counter} for this limits. @@@ -171,25 -136,23 +177,29 @@@ public abstract int perPartitionCount(); + /** + * Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is + * discarded. + */ + public abstract DataLimits withoutState(); + - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, + int nowInSec, + boolean countPartitionsWithOnlyStaticData) { - return this.newCounter(nowInSec, false).applyTo(iter); + return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); } - public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) + public UnfilteredRowIterator filter(UnfilteredRowIterator iter, + int nowInSec, + boolean countPartitionsWithOnlyStaticData) { - return this.newCounter(nowInSec, false).applyTo(iter); + return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData).applyTo(iter); } - public PartitionIterator filter(PartitionIterator iter, int nowInSec) + public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { - return this.newCounter(nowInSec, true).applyTo(iter); + return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData).applyTo(iter); } /** @@@ -429,14 -352,20 +439,16 @@@ protected class CQLCounter extends Counter { - protected final int nowInSec; - protected final boolean assumeLiveData; - protected final boolean countPartitionsWithOnlyStaticData; - protected int rowCounted; protected int rowInCurrentPartition; ++ protected final boolean countPartitionsWithOnlyStaticData; protected boolean hasLiveStaticRow; - public CQLCounter(int nowInSec, boolean assumeLiveData) + public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) { - this.nowInSec = nowInSec; - this.assumeLiveData = assumeLiveData; + super(nowInSec, assumeLiveData); + this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; } @Override @@@ -554,15 -473,9 +566,15 @@@ } @Override + public DataLimits withoutState() + { + return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData) + public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) { - return new PagingAwareCounter(nowInSec, assumeLiveData); + return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); } private class PagingAwareCounter extends CQLCounter @@@ -593,499 -506,6 +605,502 @@@ } /** + * <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates. + * <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence, + * the limits keep track of the number of rows as well as the number of groups.</p> + * <p>A group can only be counted if the next group or the end of the data is reached.</p> + */ + private static class CQLGroupByLimits extends CQLLimits + { + /** + * The <code>GroupMaker</code> state + */ + protected final GroupingState state; + + /** + * The GROUP BY specification + */ + protected final AggregationSpecification groupBySpec; + + /** + * The limit on the number of groups + */ + protected final int groupLimit; + + /** + * The limit on the number of groups per partition + */ + protected final int groupPerPartitionLimit; + + public CQLGroupByLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec) + { + this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE); + } + + private CQLGroupByLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec, + GroupingState state) + { + super(rowLimit, NO_LIMIT, false); + this.groupLimit = groupLimit; + this.groupPerPartitionLimit = groupPerPartitionLimit; + this.groupBySpec = groupBySpec; + this.state = state; + } + + @Override + public Kind kind() + { + return Kind.CQL_GROUP_BY_LIMIT; + } + + @Override + public boolean isGroupByLimit() + { + return true; + } + + public boolean isUnlimited() + { + return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT; + } + + public DataLimits forShortReadRetry(int toFetch) + { + return new CQLLimits(toFetch); + } + + @Override + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // For the moment, we return the estimated number of rows as we have no good way of estimating + // the number of groups that will be returned. Hopefully, we should be able to fix + // that problem at some point. + return super.estimateTotalResults(cfs); + } + + @Override + public DataLimits forPaging(int pageSize) + { + return new CQLGroupByLimits(pageSize, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + return new CQLGroupByPagingLimits(pageSize, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state, + lastReturnedKey, + lastReturnedKeyRemaining); + } + + @Override + public DataLimits forGroupByInternalPaging(GroupingState state) + { + return new CQLGroupByLimits(rowLimit, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { - return new GroupByAwareCounter(nowInSec, assumeLiveData); ++ return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + @Override + public int count() + { + return groupLimit; + } + + @Override + public int perPartitionCount() + { + return groupPerPartitionLimit; + } + + @Override + public DataLimits withoutState() + { + return state == GroupingState.EMPTY_STATE + ? this + : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + if (groupLimit != NO_LIMIT) + { + sb.append("GROUP LIMIT ").append(groupLimit); + if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT) + sb.append(' '); + } + + if (groupPerPartitionLimit != NO_LIMIT) + { + sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit); + if (rowLimit != NO_LIMIT) + sb.append(' '); + } + + if (rowLimit != NO_LIMIT) + { + sb.append("LIMIT ").append(rowLimit); + } + + return sb.toString(); + } + + @Override + public boolean isExhausted(Counter counter) + { + return ((GroupByAwareCounter) counter).rowCounted < rowLimit + && counter.counted() < groupLimit; + } + + protected class GroupByAwareCounter extends Counter + { + private final GroupMaker groupMaker; + ++ protected final boolean countPartitionsWithOnlyStaticData; ++ + /** + * The key of the partition being processed. + */ + protected DecoratedKey currentPartitionKey; + + /** + * The number of rows counted so far. + */ + protected int rowCounted; + + /** + * The number of rows counted so far in the current partition. + */ + protected int rowCountedInCurrentPartition; + + /** + * The number of groups counted so far. A group is counted only once it is complete + * (e.g the next one has been reached). + */ + protected int groupCounted; + + /** + * The number of groups in the current partition. + */ + protected int groupInCurrentPartition; + + protected boolean hasGroupStarted; + + protected boolean hasLiveStaticRow; + + protected boolean hasReturnedRowsFromCurrentPartition; + - private GroupByAwareCounter(int nowInSec, boolean assumeLiveData) ++ private GroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { + super(nowInSec, assumeLiveData); + this.groupMaker = groupBySpec.newGroupMaker(state); ++ this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; + + // If the end of the partition was reached at the same time than the row limit, the last group might + // not have been counted yet. Due to that we need to guess, based on the state, if the previous group + // is still open. + hasGroupStarted = state.hasClustering(); + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(state.partitionKey())) + { + // The only case were we could have state.partitionKey() equals to the partition key + // is if some of the partition rows have been returned in the previous page but the + // partition was not exhausted (as the state partition key has not been updated yet). + // Since we know we have returned rows, we know we have accounted for + // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it + // once more. + hasLiveStaticRow = false; + hasReturnedRowsFromCurrentPartition = true; + hasGroupStarted = true; + } + else + { + // We need to increment our count of groups if we have reached a new one and unless we had no new + // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get + // here with hasGroupStarted == false in the following cases: + // * the partition limit was reached for the previous partition + // * the previous partition was containing only one static row + // * the rows of the last group of the previous partition were all marked as deleted + if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING)) + { + incrementGroupCount(); + // If we detect, before starting the new partition, that we are done, we need to increase + // the per partition group count of the previous partition as the next page will start from + // there. + if (isDone()) + incrementGroupInCurrentPartitionCount(); + hasGroupStarted = false; + } + hasReturnedRowsFromCurrentPartition = false; + hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); + } + currentPartitionKey = partitionKey; + // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition + // because the pager need to retrieve the count associated to the last value it has returned. + if (!isDone()) + { + groupInCurrentPartition = 0; + rowCountedInCurrentPartition = 0; + } + } + + @Override + protected Row applyToStatic(Row row) + { + // It's possible that we're "done" if the partition we just started bumped the number of groups (in + // applyToPartition() above), in which case Transformation will still call this method. In that case, we + // want to ignore the static row, it should (and will) be returned with the next page/group if needs be. + if (isDone()) + { + hasLiveStaticRow = false; // The row has not been returned + return Rows.EMPTY_STATIC_ROW; + } + return row; + } + + @Override + public Row applyToRow(Row row) + { + // We want to check if the row belongs to a new group even if it has been deleted. The goal being + // to minimize the chances of having to go through the same data twice if we detect on the next + // non deleted row that we have reached the limit. + if (groupMaker.isNewGroup(currentPartitionKey, row.clustering())) + { + if (hasGroupStarted) + { + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + } + hasGroupStarted = false; + } + + // That row may have made us increment the group count, which may mean we're done for this partition, in + // which case we shouldn't count this row (it won't be returned). + if (isDoneForPartition()) + { + hasGroupStarted = false; + return null; + } + + if (isLive(row)) + { + hasGroupStarted = true; + incrementRowCount(); + hasReturnedRowsFromCurrentPartition = true; + } + + return row; + } + + @Override + public int counted() + { + return groupCounted; + } + + @Override + public int countedInCurrentPartition() + { + return groupInCurrentPartition; + } + + @Override + public int rowCounted() + { + return rowCounted; + } + + @Override + public int rowCountedInCurrentPartition() + { + return rowCountedInCurrentPartition; + } + + protected void incrementRowCount() + { + rowCountedInCurrentPartition++; + if (++rowCounted >= rowLimit) + stop(); + } + + private void incrementGroupCount() + { + groupCounted++; + if (groupCounted >= groupLimit) + stop(); + } + + private void incrementGroupInCurrentPartitionCount() + { + groupInCurrentPartition++; + if (groupInCurrentPartition >= groupPerPartitionLimit) + stopInPartition(); + } + + @Override + public boolean isDoneForPartition() + { + return isDone() || groupInCurrentPartition >= groupPerPartitionLimit; + } + + @Override + public boolean isDone() + { + return groupCounted >= groupLimit; + } + + @Override + public void onPartitionClose() + { + // Normally, we don't count static rows as from a CQL point of view, it will be merge with other + // rows in the partition. However, if we only have the static row, it will be returned as one group + // so count it. - if (hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition) ++ if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition) + { + incrementRowCount(); + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + hasGroupStarted = false; + } + super.onPartitionClose(); + } + + @Override + public void onClose() + { + // Groups are only counted when the end of the group is reached. + // The end of a group is detected by 2 ways: + // 1) a new group is reached + // 2) the end of the data is reached + // We know that the end of the data is reached if the group limit has not been reached + // and the number of rows counted is smaller than the internal page size. + if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit) + { + incrementGroupCount(); + incrementGroupInCurrentPartitionCount(); + } + + super.onClose(); + } + } + } + + private static class CQLGroupByPagingLimits extends CQLGroupByLimits + { + private final ByteBuffer lastReturnedKey; + + private final int lastReturnedKeyRemaining; + + public CQLGroupByPagingLimits(int groupLimit, + int groupPerPartitionLimit, + int rowLimit, + AggregationSpecification groupBySpec, + GroupingState state, + ByteBuffer lastReturnedKey, + int lastReturnedKeyRemaining) + { + super(groupLimit, + groupPerPartitionLimit, + rowLimit, + groupBySpec, + state); + + this.lastReturnedKey = lastReturnedKey; + this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; + } + + @Override + public Kind kind() + { + return Kind.CQL_GROUP_BY_PAGING_LIMIT; + } + + @Override + public DataLimits forPaging(int pageSize) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forGroupByInternalPaging(GroupingState state) + { + throw new UnsupportedOperationException(); + } + + @Override - public Counter newCounter(int nowInSec, boolean assumeLiveData) ++ public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { + assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey()); - return new PagingGroupByAwareCounter(nowInSec, assumeLiveData); ++ return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + @Override + public DataLimits withoutState() + { + return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); + } + + private class PagingGroupByAwareCounter extends GroupByAwareCounter + { - private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData) ++ private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData) + { - super(nowInSec, assumeLiveData); ++ super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData); + } + + @Override + public void applyToPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(lastReturnedKey)) + { + currentPartitionKey = partitionKey; + groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining; + hasReturnedRowsFromCurrentPartition = true; + hasLiveStaticRow = false; + hasGroupStarted = state.hasClustering(); + } + else + { + super.applyToPartition(partitionKey, staticRow); + } + } + } + } + + /** * Limits used by thrift; this count partition and cells. */ private static class ThriftLimits extends DataLimits http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/CacheService.java index a4e18c0,d23bdb0..e209450 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@@ -439,10 -441,9 +439,10 @@@ public class CacheService implements Ca { DecoratedKey key = cfs.decorateKey(buffer); int nowInSec = FBUtilities.nowInSeconds(); - try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op)) + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key); + try (ReadExecutionController controller = cmd.executionController(); UnfilteredRowIterator iter = cmd.queryMemtableAndDisk(cfs, controller)) { - CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec, true), nowInSec); return Pair.create(new RowCacheKey(cfs.metadata.ksAndCFName, key), (IRowCacheEntry)toCache); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index f1c4dd1,c96a893..0dd1e7e --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -406,14 -390,12 +406,14 @@@ public class DataResolver extends Respo private final InetAddress source; private final DataLimits.Counter counter; private final DataLimits.Counter postReconciliationCounter; + private final long queryStartNanoTime; - private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter) + private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter, long queryStartNanoTime) { this.source = source; - this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount(); + this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); this.postReconciliationCounter = postReconciliationCounter; + this.queryStartNanoTime = queryStartNanoTime; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageProxy.java index 0da25a9,6610cf7..b8f87d9 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@@ -2281,7 -2164,9 +2281,9 @@@ public class StorageProxy implements St // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally. - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec()); - return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), ++ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), + command.nowInSec(), + command.selectsFullPartition()); } public Map<String, List<String>> getSchemaVersions() http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index da388d0,11bbc0e..9dae11c --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@@ -84,27 -84,6 +85,31 @@@ public class MultiPartitionPager implem remaining = state == null ? limit.count() : state.remaining; } - private MultiPartitionPager(SinglePartitionPager[] pagers, DataLimits limit, int nowInSec, int remaining, int current) ++ private MultiPartitionPager(SinglePartitionPager[] pagers, ++ DataLimits limit, ++ int nowInSec, ++ int remaining, ++ int current) + { + this.pagers = pagers; + this.limit = limit; + this.nowInSec = nowInSec; + this.remaining = remaining; + this.current = current; + } + + public QueryPager withUpdatedLimit(DataLimits newLimits) + { + SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length); + newPagers[current] = newPagers[current].withUpdatedLimit(newLimits); + + return new MultiPartitionPager(newPagers, + newLimits, + nowInSec, + remaining, + current); + } + public PagingState state() { // Sets current to the first non-exhausted pager http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/QueryPagers.java index 15311ab,c26bf3f..1b56417 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@@ -54,9 -53,9 +54,9 @@@ public class QueryPager int count = 0; while (!pager.isExhausted()) { - try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state)) + try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime)) { - DataLimits.Counter counter = limits.newCounter(nowInSec, true); + DataLimits.Counter counter = limits.newCounter(nowInSec, true, command.selectsFullPartition()); PartitionIterators.consume(counter.applyTo(iter)); count += counter.counted(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index 2dbcfff,ba23c67..2542db0 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -724,17 -719,17 +724,17 @@@ public abstract class CQLTeste return sessionNet(protocolVersion).execute(formatQuery(query), values); } - protected Session sessionNet() + protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable { - return sessionNet(getDefaultVersion()); + return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); } - protected com.datastax.driver.core.ResultSet executeNetWithPaging(String query, int pageSize) throws Throwable + protected Session sessionNet() { - return sessionNet().execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); - return sessionNet(PROTOCOL_VERSIONS.get(PROTOCOL_VERSIONS.size() - 1)); ++ return sessionNet(getDefaultVersion()); } - protected Session sessionNet(int protocolVersion) + protected Session sessionNet(ProtocolVersion protocolVersion) { requireNetwork(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java index 21c48dd,7e90c0a..2442c2b --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectLimitTest.java @@@ -275,4 -165,257 +277,281 @@@ public class SelectLimitTest extends CQ row(1, -1, 1, 1), row(3, -1, 1, 1)); } + + @Test + public void testFilteringOnClusteringColumnsWithLimitAndStaticColumns() throws Throwable + { + // With only one clustering column + createTable("CREATE TABLE %s (a int, b int, s int static, c int, primary key (a, b))" - + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}"); ++ + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}"); + + for (int i = 0; i < 4; i++) + { + execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i); + for (int j = 0; j < 3; j++) + if (!((i == 0 || i == 3) && j == 1)) + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", i, j, i + j); + } + + beforeAndAfterFlush(() -> + { + assertRows(execute("SELECT * FROM %s"), + row(0, 0, 0, 0), + row(0, 2, 0, 2), + row(1, 0, 1, 1), + row(1, 1, 1, 2), + row(1, 2, 1, 3), + row(2, 0, 2, 2), + row(2, 1, 2, 3), + row(2, 2, 2, 4), + row(3, 0, 3, 3), + row(3, 2, 3, 5)); + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // The problem was that the static row of the partition 0 used to be only filtered in SelectStatement and was + // by consequence counted as a row. In which case the query was returning one row less. + assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); ++ ++ assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 GROUP BY a LIMIT 2 ALLOW FILTERING", pageSize), ++ row(1, 1, 1, 2), ++ row(2, 1, 2, 3)); ++ ++ assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 AND b <= 1 GROUP BY a LIMIT 2 ALLOW FILTERING", pageSize), ++ row(1, 1, 1, 2), ++ row(2, 1, 2, 3)); + } + }); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b = 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3) AND b >= 1 AND b <= 1 ORDER BY b DESC LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + execute("SELECT * FROM %s WHERE a IN (0, 1, 2, 3)"); // Load all data in the row cache + + // Partition range queries + assertRows(execute("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // Multiple partitions queries + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2) AND b = 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (0, 1, 2) AND b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b >= 1 AND b <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 2), + row(2, 1, 2, 3)); + } + + // With multiple clustering columns + createTable("CREATE TABLE %s (a int, b int, c int, s int static, d int, primary key (a, b, c))" + + " WITH caching = {'keys': 'ALL', 'rows_per_partition' : 'ALL'}"); + + for (int i = 0; i < 3; i++) + { + execute("INSERT INTO %s (a, s) VALUES (?, ?)", i, i); + for (int j = 0; j < 3; j++) + if (!(i == 0 && j == 1)) + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", i, j, j, i + j); + } + + beforeAndAfterFlush(() -> + { + assertRows(execute("SELECT * FROM %s"), + row(0, 0, 0, 0, 0), + row(0, 2, 2, 0, 2), + row(1, 0, 0, 1, 1), + row(1, 1, 1, 1, 2), + row(1, 2, 2, 1, 3), + row(2, 0, 0, 2, 2), + row(2, 1, 1, 2, 3), + row(2, 2, 2, 2, 4)); + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); ++ ++ assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 GROUP BY a, b LIMIT 2 ALLOW FILTERING", pageSize), ++ row(1, 1, 1, 1, 2), ++ row(2, 1, 1, 2, 3)); ++ ++ assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 GROUP BY a, b LIMIT 2 ALLOW FILTERING", pageSize), ++ row(1, 1, 1, 1, 2), ++ row(2, 1, 1, 2, 3)); + } + }); + + execute("SELECT * FROM %s WHERE a IN (0, 1, 2)"); // Load data in the row cache + + assertRows(execute("SELECT * FROM %s WHERE b = 1 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (1, 2, 3, 4) AND b = 1 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRows(execute("SELECT * FROM %s WHERE a IN (1, 2, 3, 4) AND b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING"), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE b IN (1, 2, 3, 4) AND c >= 1 AND c <= 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 1, 1, 2), + row(2, 1, 1, 2, 3)); + } + } + + @Test + public void testIndexOnRegularColumnWithPartitionWithoutRows() throws Throwable + { + createTable("CREATE TABLE %s (pk int, c int, s int static, v int, PRIMARY KEY(pk, c))"); + createIndex("CREATE INDEX ON %s (v)"); + + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 2); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 1); + flush(); + + assertRows(execute("SELECT * FROM %s WHERE v = ?", 1), + row(1, 1, 9, 1), + row(3, 1, 9, 1), + row(4, 1, 9, 1)); + + execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1); + + // Test without paging + assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2", 1), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + ++ assertRows(execute("SELECT * FROM %s WHERE v = ? GROUP BY pk LIMIT 2", 1), ++ row(1, 1, 9, 1), ++ row(4, 1, 9, 1)); ++ + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 LIMIT 2", pageSize), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); ++ ++ assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 GROUP BY pk LIMIT 2", pageSize), ++ row(1, 1, 9, 1), ++ row(4, 1, 9, 1)); + } + } + + @Test + public void testFilteringWithPartitionWithoutRows() throws Throwable + { + createTable("CREATE TABLE %s (pk int, c int, s int static, v int, PRIMARY KEY(pk, c))"); + + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 1, 2, 9, 2); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 3, 1, 9, 1); + execute("INSERT INTO %s (pk, c, s, v) VALUES (?, ?, ?, ?)", 4, 1, 9, 1); + flush(); + + assertRows(execute("SELECT * FROM %s WHERE v = ? ALLOW FILTERING", 1), + row(1, 1, 9, 1), + row(3, 1, 9, 1), + row(4, 1, 9, 1)); + + execute("DELETE FROM %s WHERE pk = ? and c = ?", 3, 1); + + // Test without paging + assertRows(execute("SELECT * FROM %s WHERE v = ? LIMIT 2 ALLOW FILTERING", 1), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + assertRows(execute("SELECT * FROM %s WHERE pk IN ? AND v = ? LIMIT 2 ALLOW FILTERING", list(1, 3, 4), 1), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + // Test with paging + for (int pageSize = 1; pageSize < 4; pageSize++) + { + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE v = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + + assertRowsNet(executeNetWithPaging("SELECT * FROM %s WHERE pk IN (1, 3, 4) AND v = 1 LIMIT 2 ALLOW FILTERING", pageSize), + row(1, 1, 9, 1), + row(4, 1, 9, 1)); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7aa89a64/test/unit/org/apache/cassandra/db/rows/UnfilteredRowIteratorsTest.java ---------------------------------------------------------------------- --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org