Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 f4dab0f9b -> 7e6c1d548
Fix SELECT statement with IN restrictions on partition key, ORDER BY and LIMIT patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-10729 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7e6c1d54 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7e6c1d54 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7e6c1d54 Branch: refs/heads/cassandra-3.0 Commit: 7e6c1d5483b35ab911113dff0f5fd559760d733b Parents: f4dab0f Author: Benjamin Lerer <[email protected]> Authored: Thu Nov 26 17:40:33 2015 +0100 Committer: Benjamin Lerer <[email protected]> Committed: Thu Nov 26 17:40:33 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 2 + NEWS.txt | 9 ++ .../cql3/statements/SelectStatement.java | 91 ++++++++++++++------ .../apache/cassandra/db/filter/DataLimits.java | 20 +++-- .../operations/SelectOrderByTest.java | 41 +++++++++ 5 files changed, 127 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index eab04fb..e1a959a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0.1 + * Fix SELECT statement with IN restrictions on partition key, + ORDER BY and LIMIT (CASSANDRA-10729) * Improve stress performance over 1k threads (CASSANDRA-7217) * Wait for migration responses to complete before bootstrapping (CASSANDRA-10731) * Unable to create a function with argument of type Inet (CASSANDRA-10741) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index d971d5e..02a9525 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -13,6 +13,15 @@ restore snapshots created with the previous major version using the 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.0.1 +===== + +Upgrading +--------- + - The return value of SelectStatement::getLimit as been changed from DataLimits + to int. + + 3.0 === http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index ab1da45..a9bb121 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -202,15 +202,16 @@ public class SelectStatement implements CQLStatement cl.validateForRead(keyspace()); int nowInSec = FBUtilities.nowInSeconds(); - ReadQuery query = getQuery(options, nowInSec); + int userLimit = getLimit(options); + ReadQuery query = getQuery(options, nowInSec, userLimit); int pageSize = getPageSize(options); if (pageSize <= 0 || query.limits().count() <= pageSize) - return execute(query, options, state, nowInSec); + return execute(query, options, state, nowInSec, userLimit); QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion()); - return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec); + return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit); } private int getPageSize(QueryOptions options) @@ -228,18 +229,27 @@ public class SelectStatement implements CQLStatement public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException { - DataLimits limit = getLimit(options); + return getQuery(options, nowInSec, getLimit(options)); + } + + public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit) throws RequestValidationException + { + DataLimits limit = getDataLimits(userLimit); if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) return getRangeCommand(options, limit, nowInSec); return getSliceCommands(options, limit, nowInSec); } - private ResultMessage.Rows execute(ReadQuery query, QueryOptions options, QueryState state, int nowInSec) throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows execute(ReadQuery query, + QueryOptions options, + QueryState state, + int nowInSec, + int userLimit) throws RequestValidationException, RequestExecutionException { try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState())) { - return processResults(data, options, nowInSec); + return processResults(data, options, nowInSec, userLimit); } } @@ -310,8 +320,11 @@ public class SelectStatement implements CQLStatement } } - private ResultMessage.Rows execute(Pager pager, QueryOptions options, int pageSize, int nowInSec) - throws RequestValidationException, RequestExecutionException + private ResultMessage.Rows execute(Pager pager, + QueryOptions options, + int pageSize, + int nowInSec, + int userLimit) throws RequestValidationException, RequestExecutionException { if (selection.isAggregate()) return pageAggregateQuery(pager, options, pageSize, nowInSec); @@ -324,7 +337,7 @@ public class SelectStatement implements CQLStatement ResultMessage.Rows msg; try (PartitionIterator page = pager.fetchPage(pageSize)) { - msg = processResults(page, options, nowInSec); + msg = processResults(page, options, nowInSec, userLimit); } // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this @@ -366,16 +379,20 @@ public class SelectStatement implements CQLStatement return new ResultMessage.Rows(result.build(options.getProtocolVersion())); } - private ResultMessage.Rows processResults(PartitionIterator partitions, QueryOptions options, int nowInSec) throws RequestValidationException + private ResultMessage.Rows processResults(PartitionIterator partitions, + QueryOptions options, + int nowInSec, + int userLimit) throws RequestValidationException { - ResultSet rset = process(partitions, options, nowInSec); + ResultSet rset = process(partitions, options, nowInSec, userLimit); return new ResultMessage.Rows(rset); } public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { int nowInSec = FBUtilities.nowInSeconds(); - ReadQuery query = getQuery(options, nowInSec); + int userLimit = getLimit(options); + ReadQuery query = getQuery(options, nowInSec, userLimit); int pageSize = getPageSize(options); try (ReadOrderGroup orderGroup = query.startOrderGroup()) @@ -384,20 +401,20 @@ public class SelectStatement implements CQLStatement { try (PartitionIterator data = query.executeInternal(orderGroup)) { - return processResults(data, options, nowInSec); + return processResults(data, options, nowInSec, userLimit); } } else { QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion()); - return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec); + return execute(Pager.forInternalQuery(pager, orderGroup), options, pageSize, nowInSec, userLimit); } } } public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException { - return process(partitions, QueryOptions.DEFAULT, nowInSec); + return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT)); } public String keyspace() @@ -549,18 +566,37 @@ public class SelectStatement implements CQLStatement return builder.build(); } - /** - * May be used by custom QueryHandler implementations - */ - public DataLimits getLimit(QueryOptions options) throws InvalidRequestException + private DataLimits getDataLimits(int userLimit) { - int userLimit = -1; + int cqlRowLimit = DataLimits.NO_LIMIT; + // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever // return 1 result and can therefore basically ignore the user LIMIT in this case. // Whenever we support GROUP BY, we'll have to add a new DataLimits kind that knows how things are grouped and is thus // able to apply the user limit properly. - if (limit != null && !selection.isAggregate()) + // If we do post ordering we need to get all the results sorted before we can trim them. + if (!selection.isAggregate() && !needsPostQueryOrdering()) + cqlRowLimit = userLimit; + + if (parameters.isDistinct) + return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit); + + return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.NONE : DataLimits.cqlLimits(cqlRowLimit); + } + + /** + * Returns the limit specified by the user. + * May be used by custom QueryHandler implementations + * + * @return the limit specified by the user or <code>DataLimits.NO_LIMIT</code> if no value + * as been specified. + */ + public int getLimit(QueryOptions options) + { + int userLimit = DataLimits.NO_LIMIT; + + if (limit != null) { ByteBuffer b = checkNotNull(limit.bindAndGet(options), "Invalid null value of limit"); // treat UNSET limit value as 'unlimited' @@ -578,11 +614,7 @@ public class SelectStatement implements CQLStatement } } } - - if (parameters.isDistinct) - return userLimit < 0 ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(userLimit); - - return userLimit < 0 ? DataLimits.NONE : DataLimits.cqlLimits(userLimit); + return userLimit; } private NavigableSet<Clustering> getRequestedRows(QueryOptions options) throws InvalidRequestException @@ -604,7 +636,10 @@ public class SelectStatement implements CQLStatement return filter; } - private ResultSet process(PartitionIterator partitions, QueryOptions options, int nowInSec) throws InvalidRequestException + private ResultSet process(PartitionIterator partitions, + QueryOptions options, + int nowInSec, + int userLimit) throws InvalidRequestException { Selection.ResultSetBuilder result = selection.resultSetBuilder(parameters.isJson); while (partitions.hasNext()) @@ -619,6 +654,8 @@ public class SelectStatement implements CQLStatement orderResults(cqlRows); + cqlRows.trim(userLimit); + return cqlRows; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 130c6ba..19f24ad 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -41,7 +41,9 @@ public abstract class DataLimits { public static final Serializer serializer = new Serializer(); - public static final DataLimits NONE = new CQLLimits(Integer.MAX_VALUE) + public static final int NO_LIMIT = Integer.MAX_VALUE; + + public static final DataLimits NONE = new CQLLimits(NO_LIMIT) { @Override public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) @@ -64,7 +66,7 @@ public abstract class DataLimits // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering. - public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true); + public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true); public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT } @@ -236,7 +238,7 @@ public abstract class DataLimits private CQLLimits(int rowLimit) { - this(rowLimit, Integer.MAX_VALUE); + this(rowLimit, NO_LIMIT); } private CQLLimits(int rowLimit, int perPartitionLimit) @@ -263,7 +265,7 @@ public abstract class DataLimits public boolean isUnlimited() { - return rowLimit == Integer.MAX_VALUE && perPartitionLimit == Integer.MAX_VALUE; + return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT; } public DataLimits forPaging(int pageSize) @@ -281,7 +283,7 @@ public abstract class DataLimits // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch // for both argument or just for perPartitionLimit with no limit on rowLimit). - return new CQLLimits(toFetch, Integer.MAX_VALUE, isDistinct); + return new CQLLimits(toFetch, NO_LIMIT, isDistinct); } public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) @@ -410,14 +412,14 @@ public abstract class DataLimits { StringBuilder sb = new StringBuilder(); - if (rowLimit != Integer.MAX_VALUE) + if (rowLimit != NO_LIMIT) { sb.append("LIMIT ").append(rowLimit); - if (perPartitionLimit != Integer.MAX_VALUE) + if (perPartitionLimit != NO_LIMIT) sb.append(' '); } - if (perPartitionLimit != Integer.MAX_VALUE) + if (perPartitionLimit != NO_LIMIT) sb.append("PER PARTITION LIMIT ").append(perPartitionLimit); return sb.toString(); @@ -508,7 +510,7 @@ public abstract class DataLimits public boolean isUnlimited() { - return partitionLimit == Integer.MAX_VALUE && cellPerPartitionLimit == Integer.MAX_VALUE; + return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT; } public DataLimits forPaging(int pageSize) http://git-wip-us.apache.org/repos/asf/cassandra/blob/7e6c1d54/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java index 73bbaca..ae6f772 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectOrderByTest.java @@ -333,6 +333,12 @@ public class SelectOrderByTest extends CQLTester assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"), row(1), row(2), row(3)); + assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 2"), + row(1), row(2)); + + assertRows(execute("SELECT col1 FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1 LIMIT 10"), + row(1), row(2), row(3)); + assertRows(execute("SELECT col1, my_id FROM %s WHERE my_id in('key1', 'key2', 'key3') ORDER BY col1"), row(1, "key1"), row(2, "key3"), row(3, "key2")); @@ -360,6 +366,15 @@ public class SelectOrderByTest extends CQLTester row("A"), row("D")); + assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 2; ", 1, 1, 2), + row("B"), + row("A")); + + assertRows(execute("SELECT v FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c LIMIT 10; ", 1, 1, 2), + row("B"), + row("A"), + row("D")); + assertRows(execute("SELECT v as c FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c; ", 1, 1, 2), row("B"), row("A"), @@ -390,6 +405,32 @@ public class SelectOrderByTest extends CQLTester row("B"), row("D"), row("A")); + + assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 2; ", 1, 1, 2), + row("B"), + row("D")); + + assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1, c2 LIMIT 10; ", 1, 1, 2), + row("B"), + row("D"), + row("A")); + + assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC; ", 1, 1, 2), + row("A"), + row("D"), + row("B")); + + assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 2; ", 1, 1, 2), + row("A"), + row("D")); + + assertRows(execute("SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 10; ", 1, 1, 2), + row("A"), + row("D"), + row("B")); + + assertInvalidMessage("LIMIT must be strictly positive", + "SELECT v as c2 FROM %s where pk1 = ? AND pk2 IN (?, ?) ORDER BY c1 DESC , c2 DESC LIMIT 0; ", 1, 1, 2); } /**
