Repository: cassandra Updated Branches: refs/heads/trunk bd633377a -> 2fbddbd99
Allow per partition limit in SELECT queries Patch by Alex Petrov; reviewed by Sylvain Lebresne for CASSANDRA-7017. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2fbddbd9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2fbddbd9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2fbddbd9 Branch: refs/heads/trunk Commit: 2fbddbd9926eac07031196f74c637730a3953dce Parents: bd63337 Author: Alex Petrov <[email protected]> Authored: Tue Apr 5 17:45:59 2016 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Thu Apr 7 11:41:49 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/antlr/Lexer.g | 2 + src/antlr/Parser.g | 6 +- .../cql3/statements/CreateViewStatement.java | 2 +- .../cql3/statements/SelectStatement.java | 57 ++++++++---- .../apache/cassandra/db/filter/DataLimits.java | 15 +++- .../cql3/validation/operations/SelectTest.java | 95 +++++++++++++++++++- 7 files changed, 155 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e522035..4631178 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.6 + * Allow per-partition LIMIT clause in CQL (CASSANDRA-7017) * Make custom filtering more extensible with UserExpression (CASSANDRA-11295) * Improve field-checking and error reporting in cassandra.yaml (CASSANDRA-10649) * Print CAS stats in nodetool proxyhistograms (CASSANDRA-11507) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/antlr/Lexer.g ---------------------------------------------------------------------- diff --git a/src/antlr/Lexer.g b/src/antlr/Lexer.g index c73ccaa..d93a5eb 100644 --- a/src/antlr/Lexer.g +++ b/src/antlr/Lexer.g @@ -69,6 +69,8 @@ K_INSERT: I N S E R T; K_UPDATE: U P D A T E; K_WITH: W I T H; K_LIMIT: L I M I T; +K_PER: P E R; +K_PARTITION: P A R T I T I O N; K_USING: U S I N G; K_USE: U S E; K_DISTINCT: D I S T I N C T; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/antlr/Parser.g ---------------------------------------------------------------------- diff --git a/src/antlr/Parser.g b/src/antlr/Parser.g index 78a2d0d..0b21775 100644 --- a/src/antlr/Parser.g +++ b/src/antlr/Parser.g @@ -221,6 +221,7 @@ selectStatement returns [SelectStatement.RawStatement expr] @init { boolean isDistinct = false; Term.Raw limit = null; + Term.Raw perPartitionLimit = null; Map<ColumnIdentifier.Raw, Boolean> orderings = new LinkedHashMap<ColumnIdentifier.Raw, Boolean>(); boolean allowFiltering = false; boolean isJson = false; @@ -231,6 +232,7 @@ selectStatement returns [SelectStatement.RawStatement expr] K_FROM cf=columnFamilyName ( K_WHERE wclause=whereClause )? ( K_ORDER K_BY orderByClause[orderings] ( ',' orderByClause[orderings] )* )? + ( K_PER K_PARTITION K_LIMIT rows=intValue { perPartitionLimit = rows; } )? ( K_LIMIT rows=intValue { limit = rows; } )? ( K_ALLOW K_FILTERING { allowFiltering = true; } )? { @@ -239,7 +241,7 @@ selectStatement returns [SelectStatement.RawStatement expr] allowFiltering, isJson); WhereClause where = wclause == null ? WhereClause.empty() : wclause.build(); - $expr = new SelectStatement.RawStatement(cf, params, sclause, where, limit); + $expr = new SelectStatement.RawStatement(cf, params, sclause, where, limit, perPartitionLimit); } ; @@ -1570,5 +1572,7 @@ basic_unreserved_keyword returns [String str] | K_CALLED | K_INPUT | K_LIKE + | K_PER + | K_PARTITION ) { $str = $k.text; } ; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java index 5af4887..909fe4f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java @@ -195,7 +195,7 @@ public class CreateViewStatement extends SchemaAlteringStatement // build the select statement Map<ColumnIdentifier.Raw, Boolean> orderings = Collections.emptyMap(); SelectStatement.Parameters parameters = new SelectStatement.Parameters(orderings, false, true, false); - SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null); + SelectStatement.RawStatement rawSelect = new SelectStatement.RawStatement(baseName, parameters, selectClause, whereClause, null, null); ClientState state = ClientState.forInternalCalls(); state.setKeyspace(keyspace()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 2f4d468..9b68d7a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -60,6 +60,7 @@ import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNull; import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; import static org.apache.cassandra.utils.ByteBufferUtil.UNSET_BYTE_BUFFER; @@ -83,6 +84,7 @@ public class SelectStatement implements CQLStatement public final Parameters parameters; private final Selection selection; private final Term limit; + private final Term perPartitionLimit; private final StatementRestrictions restrictions; @@ -105,7 +107,8 @@ public class SelectStatement implements CQLStatement StatementRestrictions restrictions, boolean isReversed, Comparator<List<ByteBuffer>> orderingComparator, - Term limit) + Term limit, + Term perPartitionLimit) { this.cfm = cfm; this.boundTerms = boundTerms; @@ -115,6 +118,7 @@ public class SelectStatement implements CQLStatement this.orderingComparator = orderingComparator; this.parameters = parameters; this.limit = limit; + this.perPartitionLimit = perPartitionLimit; this.queriedColumns = gatherQueriedColumns(); } @@ -122,7 +126,8 @@ public class SelectStatement implements CQLStatement { return Iterables.concat(selection.getFunctions(), restrictions.getFunctions(), - limit != null ? limit.getFunctions() : Collections.<Function>emptySet()); + limit != null ? limit.getFunctions() : Collections.<Function>emptySet(), + perPartitionLimit != null ? perPartitionLimit.getFunctions() : Collections.<Function>emptySet()); } // Note that the queried columns internally is different from the one selected by the @@ -154,6 +159,7 @@ public class SelectStatement implements CQLStatement StatementRestrictions.empty(StatementType.SELECT, cfm), false, null, + null, null); } @@ -197,8 +203,9 @@ public class SelectStatement implements CQLStatement cl.validateForRead(keyspace()); int nowInSec = FBUtilities.nowInSeconds(); - int userLimit = getLimit(options); - ReadQuery query = getQuery(options, nowInSec, userLimit); + int userLimit = getLimit(limit, options); + int userPerPartitionLimit = getLimit(perPartitionLimit, options); + ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit); int pageSize = getPageSize(options); @@ -224,12 +231,12 @@ public class SelectStatement implements CQLStatement public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException { - return getQuery(options, nowInSec, getLimit(options)); + return getQuery(options, nowInSec, getLimit(limit, options), getLimit(perPartitionLimit, options)); } - public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit) throws RequestValidationException + public ReadQuery getQuery(QueryOptions options, int nowInSec, int userLimit, int perPartitionLimit) throws RequestValidationException { - DataLimits limit = getDataLimits(userLimit); + DataLimits limit = getDataLimits(userLimit, perPartitionLimit); if (restrictions.isKeyRange() || restrictions.usesSecondaryIndexing()) return getRangeCommand(options, limit, nowInSec); @@ -386,8 +393,9 @@ public class SelectStatement implements CQLStatement public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException { int nowInSec = FBUtilities.nowInSeconds(); - int userLimit = getLimit(options); - ReadQuery query = getQuery(options, nowInSec, userLimit); + int userLimit = getLimit(limit, options); + int userPerPartitionLimit = getLimit(perPartitionLimit, options); + ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit); int pageSize = getPageSize(options); try (ReadExecutionController executionController = query.executionController()) @@ -409,7 +417,7 @@ public class SelectStatement implements CQLStatement public ResultSet process(PartitionIterator partitions, int nowInSec) throws InvalidRequestException { - return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(QueryOptions.DEFAULT)); + return process(partitions, QueryOptions.DEFAULT, nowInSec, getLimit(limit, QueryOptions.DEFAULT)); } public String keyspace() @@ -576,9 +584,10 @@ public class SelectStatement implements CQLStatement return builder.build(); } - private DataLimits getDataLimits(int userLimit) + private DataLimits getDataLimits(int userLimit, int perPartitionLimit) { int cqlRowLimit = DataLimits.NO_LIMIT; + int cqlPerPartitionLimit = DataLimits.NO_LIMIT; // If we aggregate, the limit really apply to the number of rows returned to the user, not to what is queried, and // since in practice we currently only aggregate at top level (we have no GROUP BY support yet), we'll only ever @@ -587,12 +596,15 @@ public class SelectStatement implements CQLStatement // able to apply the user limit properly. // If we do post ordering we need to get all the results sorted before we can trim them. if (!selection.isAggregate() && !needsPostQueryOrdering()) + { cqlRowLimit = userLimit; + cqlPerPartitionLimit = perPartitionLimit; + } if (parameters.isDistinct) return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.DISTINCT_NONE : DataLimits.distinctLimits(cqlRowLimit); - return cqlRowLimit == DataLimits.NO_LIMIT ? DataLimits.NONE : DataLimits.cqlLimits(cqlRowLimit); + return DataLimits.cqlLimits(cqlRowLimit, cqlPerPartitionLimit); } /** @@ -602,7 +614,7 @@ public class SelectStatement implements CQLStatement * @return the limit specified by the user or <code>DataLimits.NO_LIMIT</code> if no value * as been specified. */ - public int getLimit(QueryOptions options) + public int getLimit(Term limit, QueryOptions options) { int userLimit = DataLimits.NO_LIMIT; @@ -784,14 +796,20 @@ public class SelectStatement implements CQLStatement public final List<RawSelector> selectClause; public final WhereClause whereClause; public final Term.Raw limit; + public final Term.Raw perPartitionLimit; - public RawStatement(CFName cfName, Parameters parameters, List<RawSelector> selectClause, WhereClause whereClause, Term.Raw limit) + public RawStatement(CFName cfName, Parameters parameters, + List<RawSelector> selectClause, + WhereClause whereClause, + Term.Raw limit, + Term.Raw perPartitionLimit) { super(cfName); this.parameters = parameters; this.selectClause = selectClause; this.whereClause = whereClause; this.limit = limit; + this.perPartitionLimit = perPartitionLimit; } public ParsedStatement.Prepared prepare() throws InvalidRequestException @@ -811,7 +829,10 @@ public class SelectStatement implements CQLStatement StatementRestrictions restrictions = prepareRestrictions(cfm, boundNames, selection, forView); if (parameters.isDistinct) + { + checkNull(perPartitionLimit, "PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries"); validateDistinctSelection(cfm, selection, restrictions); + } Comparator<List<ByteBuffer>> orderingComparator = null; boolean isReversed = false; @@ -835,7 +856,8 @@ public class SelectStatement implements CQLStatement restrictions, isReversed, orderingComparator, - prepareLimit(boundNames)); + prepareLimit(boundNames, limit, keyspace(), limitReceiver()), + prepareLimit(boundNames, perPartitionLimit, keyspace(), limitReceiver())); return new ParsedStatement.Prepared(stmt, boundNames, boundNames.getPartitionKeyBindIndexes(cfm)); } @@ -874,12 +896,13 @@ public class SelectStatement implements CQLStatement } /** Returns a Term for the limit or null if no limit is set */ - private Term prepareLimit(VariableSpecifications boundNames) throws InvalidRequestException + private Term prepareLimit(VariableSpecifications boundNames, Term.Raw limit, + String keyspace, ColumnSpecification limitReceiver) throws InvalidRequestException { if (limit == null) return null; - Term prepLimit = limit.prepare(keyspace(), limitReceiver()); + Term prepLimit = limit.prepare(keyspace, limitReceiver); prepLimit.collectMarkerSpecification(boundNames); return prepLimit; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index f6fdcdd..ebd2ebb 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -72,12 +72,21 @@ public abstract class DataLimits public static DataLimits cqlLimits(int cqlRowLimit) { - return new CQLLimits(cqlRowLimit); + return cqlRowLimit == NO_LIMIT ? NONE : new CQLLimits(cqlRowLimit); } public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit) { - return new CQLLimits(cqlRowLimit, perPartitionLimit); + return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT + ? NONE + : new CQLLimits(cqlRowLimit, perPartitionLimit); + } + + private static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit, boolean isDistinct) + { + return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT && !isDistinct + ? NONE + : new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct); } public static DataLimits distinctLimits(int cqlRowLimit) @@ -766,7 +775,7 @@ public abstract class DataLimits int perPartitionLimit = (int)in.readUnsignedVInt(); boolean isDistinct = in.readBoolean(); if (kind == Kind.CQL_LIMIT) - return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); + return cqlLimits(rowLimit, perPartitionLimit, isDistinct); ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in); int lastRemaining = (int)in.readUnsignedVInt(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2fbddbd9/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java index 077712e..be62f6c 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/SelectTest.java @@ -2353,4 +2353,97 @@ public class SelectTest extends CQLTester "SELECT * FROM %s WHERE c CONTAINS KEY ? ALLOW FILTERING", unset()); } -} + + @Test + public void testPerPartitionLimit() throws Throwable + { + perPartitionLimitTest(false); + } + + @Test + public void testPerPartitionLimitWithCompactStorage() throws Throwable + { + perPartitionLimitTest(true); + } + + private void perPartitionLimitTest(boolean withCompactStorage) throws Throwable + { + String query = "CREATE TABLE %s (a int, b int, c int, PRIMARY KEY (a, b))"; + + if (withCompactStorage) + createTable(query + " WITH COMPACT STORAGE"); + else + createTable(query); + + for (int i = 0; i < 5; i++) + { + for (int j = 0; j < 5; j++) + { + execute("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", i, j, j); + } + } + + assertInvalidMessage("LIMIT must be strictly positive", + "SELECT * FROM %s PER PARTITION LIMIT ?", 0); + assertInvalidMessage("LIMIT must be strictly positive", + "SELECT * FROM %s PER PARTITION LIMIT ?", -1); + + assertRowsIgnoringOrder(execute("SELECT * FROM %s PER PARTITION LIMIT ?", 2), + row(0, 0, 0), + row(0, 1, 1), + row(1, 0, 0), + row(1, 1, 1), + row(2, 0, 0), + row(2, 1, 1), + row(3, 0, 0), + row(3, 1, 1), + row(4, 0, 0), + row(4, 1, 1)); + + + // Combined Per Partition and "global" limit + assertRowCount(execute("SELECT * FROM %s PER PARTITION LIMIT ? LIMIT ?", 2, 6), + 6); + + // odd amount of results + assertRowCount(execute("SELECT * FROM %s PER PARTITION LIMIT ? LIMIT ?", 2, 5), + 5); + + // IN query + assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE a IN (2,3) PER PARTITION LIMIT ?", 2), + row(2, 0, 0), + row(2, 1, 1), + row(3, 0, 0), + row(3, 1, 1)); + + assertRowCount(execute("SELECT * FROM %s WHERE a IN (2,3) PER PARTITION LIMIT ? LIMIT 3", 2), 3); + assertRowCount(execute("SELECT * FROM %s WHERE a IN (1,2,3) PER PARTITION LIMIT ? LIMIT 3", 2), 3); + + + // with restricted partition key + assertRows(execute("SELECT * FROM %s WHERE a = ? PER PARTITION LIMIT ?", 2, 3), + row(2, 0, 0), + row(2, 1, 1), + row(2, 2, 2)); + + // with ordering + assertRows(execute("SELECT * FROM %s WHERE a = ? ORDER BY b DESC PER PARTITION LIMIT ?", 2, 3), + row(2, 4, 4), + row(2, 3, 3), + row(2, 2, 2)); + + // with filtering + assertRows(execute("SELECT * FROM %s WHERE a = ? AND b > ? PER PARTITION LIMIT ? ALLOW FILTERING", 2, 0, 2), + row(2, 1, 1), + row(2, 2, 2)); + + assertRows(execute("SELECT * FROM %s WHERE a = ? AND b > ? ORDER BY b DESC PER PARTITION LIMIT ? ALLOW FILTERING", 2, 2, 2), + row(2, 4, 4), + row(2, 3, 3)); + + assertInvalidMessage("PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries", + "SELECT DISTINCT a FROM %s PER PARTITION LIMIT ?", 3); + assertInvalidMessage("PER PARTITION LIMIT is not allowed with SELECT DISTINCT queries", + "SELECT DISTINCT a FROM %s PER PARTITION LIMIT ? LIMIT ?", 3, 4); + } +} \ No newline at end of file
