Updated Branches: refs/heads/cassandra-1.1 1ab4ec174 -> 8b81c8f2f
Allow paging through non-ordered partitioner results in CQL3 patch by slebresne; reviewed by xedin for CASSANDRA-3771 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8b81c8f2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8b81c8f2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8b81c8f2 Branch: refs/heads/cassandra-1.1 Commit: 8b81c8f2fb93a051fbd317c03af727ab919034f2 Parents: 1ab4ec1 Author: Sylvain Lebresne <[email protected]> Authored: Mon May 7 13:47:11 2012 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Mon May 7 13:47:11 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cql3/Cql.g | 7 + src/java/org/apache/cassandra/cql3/Relation.java | 13 +- src/java/org/apache/cassandra/cql3/Term.java | 70 ++++--- .../cassandra/cql3/statements/SelectStatement.java | 157 +++++++++------ 5 files changed, 152 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0c8e022..baf899f 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -35,6 +35,7 @@ * (cql3) Move max/min compaction thresholds to compaction strategy options (CASSANDRA-4187) * Fix exception during move when localhost is the only source (CASSANDRA-4200) + * (cql3) Allow paging through non-ordered partitioner results (CASSANDRA-3771) Merged from 1.0: * Fix super columns bug where cache is not updated (CASSANDRA-4190) * fix maxTimestamp to include row tombstones (CASSANDRA-4116) http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Cql.g ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Cql.g b/src/java/org/apache/cassandra/cql3/Cql.g index bcfac8a..f567958 100644 --- a/src/java/org/apache/cassandra/cql3/Cql.g +++ b/src/java/org/apache/cassandra/cql3/Cql.g @@ -461,6 +461,11 @@ cidentList returns [List<ColumnIdentifier> items] ; // Values (includes prepared statement markers) +extendedTerm returns [Term term] + : K_TOKEN '(' t=term ')' { $term = Term.tokenOf(t); } + | t=term { $term = t; } + ; + term returns [Term term] : t=(STRING_LITERAL | UUID | INTEGER | FLOAT ) { $term = new Term($t.text, $t.type); } | t=QMARK { $term = new Term($t.text, $t.type, ++currentBindMarkerIdx); } @@ -502,6 +507,7 @@ properties returns [Map<String, String> props] relation returns [Relation rel] : name=cident type=('=' | '<' | '<=' | '>=' | '>') t=term { $rel = new Relation($name.id, $type.text, $t.term); } + | K_TOKEN '(' name=cident ')' type=('=' |'<' | '<=' | '>=' | '>') t=extendedTerm { $rel = new Relation($name.id, $type.text, $t.term, true); } | name=cident K_IN { $rel = Relation.createInRelation($name.id); } '(' f1=term { $rel.addInValue(f1); } (',' fN=term { $rel.addInValue(fN); } )* ')' ; @@ -615,6 +621,7 @@ K_UUID: U U I D; K_VARCHAR: V A R C H A R; K_VARINT: V A R I N T; K_TIMEUUID: T I M E U U I D; +K_TOKEN: T O K E N; // Case-insensitive alpha characters fragment A: ('a'|'A'); http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Relation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java index 0724abd..e7bf616 100644 --- a/src/java/org/apache/cassandra/cql3/Relation.java +++ b/src/java/org/apache/cassandra/cql3/Relation.java @@ -32,6 +32,7 @@ public class Relation private final Type relationType; private final Term value; private final List<Term> inValues; + public final boolean onToken; public static enum Type { @@ -54,12 +55,13 @@ public class Relation } } - private Relation(ColumnIdentifier entity, Type type, Term value, List<Term> inValues) + private Relation(ColumnIdentifier entity, Type type, Term value, List<Term> inValues, boolean onToken) { this.entity = entity; this.relationType = type; this.value = value; this.inValues = inValues; + this.onToken = onToken; } /** @@ -71,12 +73,17 @@ public class Relation */ public Relation(ColumnIdentifier entity, String type, Term value) { - this(entity, Type.forString(type), value, null); + this(entity, Type.forString(type), value, null, false); + } + + public Relation(ColumnIdentifier entity, String type, Term value, boolean onToken) + { + this(entity, Type.forString(type), value, null, onToken); } public static Relation createInRelation(ColumnIdentifier entity) { - return new Relation(entity, Type.IN, null, new ArrayList<Term>()); + return new Relation(entity, Type.IN, null, new ArrayList<Term>(), false); } public Type operator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/Term.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Term.java b/src/java/org/apache/cassandra/cql3/Term.java index 07e1625..c86f0e2 100644 --- a/src/java/org/apache/cassandra/cql3/Term.java +++ b/src/java/org/apache/cassandra/cql3/Term.java @@ -21,6 +21,9 @@ package org.apache.cassandra.cql3; import java.nio.ByteBuffer; import java.util.List; +import org.apache.cassandra.config.ConfigurationException; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.FloatType; @@ -35,12 +38,19 @@ public class Term private final String text; private final TermType type; public final int bindIndex; + public final boolean isToken; - public Term(String text, TermType type) + private Term(String text, TermType type, int bindIndex, boolean isToken) { this.text = text == null ? "" : text; this.type = type; - this.bindIndex = -1; + this.bindIndex = bindIndex; + this.isToken = isToken; + } + + public Term(String text, TermType type) + { + this(text, type, -1, false); } /** @@ -62,9 +72,12 @@ public class Term public Term(String text, int type, int index) { - this.text = text == null ? "" : text; - this.type = TermType.forInt(type); - this.bindIndex = index; + this(text, TermType.forInt(type), index, false); + } + + public static Term tokenOf(Term t) + { + return new Term(t.text, t.type, t.bindIndex, true); } /** @@ -74,7 +87,7 @@ public class Term */ public String getText() { - return text; + return isToken ? "token(" + text + ")" : text; } /** @@ -105,29 +118,28 @@ public class Term } } - /** - * Returns the typed value, serialized to a ByteBuffer. - * - * @return a ByteBuffer of the value. - * @throws InvalidRequestException if unable to coerce the string to its type. - */ - public ByteBuffer getByteBuffer() throws InvalidRequestException + public Token getAsToken(AbstractType<?> validator, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException { - switch (type) + if (!(isToken || type == TermType.STRING)) + throw new InvalidRequestException("Invalid value for token (use a string literal of the token value or the token() function)"); + + try { - case STRING: - return AsciiType.instance.fromString(text); - case INTEGER: - return IntegerType.instance.fromString(text); - case UUID: - // we specifically want the Lexical class here, not "UUIDType," because we're supposed to have - // a uuid-shaped string here, and UUIDType also accepts integer or date strings (and turns them into version 1 uuids). - return LexicalUUIDType.instance.fromString(text); - case FLOAT: - return FloatType.instance.fromString(text); + if (isToken) + { + ByteBuffer value = getByteBuffer(validator, variables); + return p.getToken(value); + } + else + { + p.getTokenFactory().validate(text); + return p.getTokenFactory().fromString(text); + } + } + catch (ConfigurationException e) + { + throw new InvalidRequestException(e.getMessage()); } - - throw new IllegalStateException(); } /** @@ -148,14 +160,14 @@ public class Term @Override public String toString() { - return String.format("Term(%s, type=%s)", getText(), type); + return String.format("Term(%s, type=%s%s)", getText(), type, isToken ? ", isToken" : ""); } @Override public int hashCode() { final int prime = 31; - int result = 1; + int result = 1 + (isToken ? 1 : 0); result = prime * result + ((text == null) ? 0 : text.hashCode()); result = prime * result + ((type == null) ? 0 : type.hashCode()); return result; @@ -180,6 +192,8 @@ public class Term return false; if (type != other.type) return false; + if (isToken != other.isToken) + return false; return true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/8b81c8f2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index b9d1c4f..04f16cc 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -46,13 +46,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.db.marshal.ReversedType; import org.apache.cassandra.db.marshal.TypeParser; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.Bounds; -import org.apache.cassandra.dht.ExcludingBounds; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.IncludingExcludingBounds; -import org.apache.cassandra.dht.RandomPartitioner; -import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; @@ -250,33 +244,6 @@ public class SelectStatement implements CQLStatement private List<Row> multiRangeSlice(List<ByteBuffer> variables) throws InvalidRequestException, TimedOutException, UnavailableException { List<Row> rows; - IPartitioner<?> p = StorageService.getPartitioner(); - - ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables); - ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables); - - RowPosition startKey = RowPosition.forKey(startKeyBytes, p); - RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p); - if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p)) - { - if (p instanceof RandomPartitioner) - throw new InvalidRequestException("Start key sorts after end key. This is not allowed; you probably should not specify end key at all, under RandomPartitioner"); - else - throw new InvalidRequestException("Start key must sort before (or equal to) finish key in your partitioner!"); - } - AbstractBounds<RowPosition> bounds; - if (includeKeyBound(Bound.START)) - { - bounds = includeKeyBound(Bound.END) - ? new Bounds<RowPosition>(startKey, finishKey) - : new IncludingExcludingBounds<RowPosition>(startKey, finishKey); - } - else - { - bounds = includeKeyBound(Bound.END) - ? new Range<RowPosition>(startKey, finishKey) - : new ExcludingBounds<RowPosition>(startKey, finishKey); - } // XXX: Our use of Thrift structs internally makes me Sad. :( SlicePredicate thriftSlicePredicate = makeSlicePredicate(variables); @@ -290,7 +257,7 @@ public class SelectStatement implements CQLStatement columnFamily(), null, thriftSlicePredicate, - bounds, + getKeyBounds(variables), expressions, getLimit(), true, // limit by columns, not keys @@ -308,6 +275,50 @@ public class SelectStatement implements CQLStatement return rows; } + private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> variables) throws InvalidRequestException + { + IPartitioner<?> p = StorageService.getPartitioner(); + AbstractBounds<RowPosition> bounds; + + if (keyRestriction != null && keyRestriction.onToken) + { + Token startToken = getTokenBound(Bound.START, variables, p); + Token endToken = getTokenBound(Bound.END, variables, p); + + RowPosition start = includeKeyBound(Bound.START) ? startToken.minKeyBound() : startToken.maxKeyBound(); + RowPosition end = includeKeyBound(Bound.END) ? endToken.maxKeyBound() : endToken.minKeyBound(); + bounds = new Range<RowPosition>(start, end); + } + else + { + ByteBuffer startKeyBytes = getKeyBound(Bound.START, variables); + ByteBuffer finishKeyBytes = getKeyBound(Bound.END, variables); + + RowPosition startKey = RowPosition.forKey(startKeyBytes, p); + RowPosition finishKey = RowPosition.forKey(finishKeyBytes, p); + if (startKey.compareTo(finishKey) > 0 && !finishKey.isMinimum(p)) + { + if (p instanceof RandomPartitioner) + throw new InvalidRequestException("Start key sorts after end key. This is not allowed; you probably should not specify end key at all, under RandomPartitioner"); + else + throw new InvalidRequestException("Start key must sort before (or equal to) finish key in your partitioner!"); + } + if (includeKeyBound(Bound.START)) + { + bounds = includeKeyBound(Bound.END) + ? new Bounds<RowPosition>(startKey, finishKey) + : new IncludingExcludingBounds<RowPosition>(startKey, finishKey); + } + else + { + bounds = includeKeyBound(Bound.END) + ? new Range<RowPosition>(startKey, finishKey) + : new ExcludingBounds<RowPosition>(startKey, finishKey); + } + } + return bounds; + } + private SlicePredicate makeSlicePredicate(List<ByteBuffer> variables) throws InvalidRequestException { @@ -342,11 +353,11 @@ public class SelectStatement implements CQLStatement private boolean isKeyRange() { - // If indexed columns, they always use getRangeSlices + // If indexed columns or a token range, they always use getRangeSlices if (!metadataRestrictions.isEmpty()) return true; - return keyRestriction == null || !keyRestriction.isEquality(); + return keyRestriction == null || !keyRestriction.isEquality() || keyRestriction.onToken; } private Collection<ByteBuffer> getKeys(final List<ByteBuffer> variables) throws InvalidRequestException @@ -377,6 +388,21 @@ public class SelectStatement implements CQLStatement } } + private Token getTokenBound(Bound b, List<ByteBuffer> variables, IPartitioner<?> p) throws InvalidRequestException + { + assert keyRestriction != null; + if (keyRestriction.isEquality()) + { + assert keyRestriction.eqValues.size() == 1; + return keyRestriction.eqValues.get(0).getAsToken(cfDef.key.type, variables, p); + } + else + { + Term bound = keyRestriction.bound(b); + return bound == null ? p.getMinimumToken() : bound.getAsToken(cfDef.key.type, variables, p); + } + } + private boolean includeKeyBound(Bound b) { if (keyRestriction == null || keyRestriction.isEquality()) @@ -871,17 +897,17 @@ public class SelectStatement implements CQLStatement switch (name.kind) { case KEY_ALIAS: - if (rel.operator() != Relation.Type.EQ && rel.operator() != Relation.Type.IN && !StorageService.getPartitioner().preservesOrder()) - throw new InvalidRequestException("Only EQ and IN relation are supported on first component of the PRIMARY KEY for RandomPartitioner"); - stmt.keyRestriction = updateRestriction(name.name, stmt.keyRestriction, rel); + if (rel.operator() != Relation.Type.EQ && rel.operator() != Relation.Type.IN && !rel.onToken && !StorageService.getPartitioner().preservesOrder()) + throw new InvalidRequestException("Only EQ and IN relation are supported on first component of the PRIMARY KEY for RandomPartitioner (unless you use the token() function)"); + stmt.keyRestriction = updateRestriction(name, stmt.keyRestriction, rel); break; case COLUMN_ALIAS: - stmt.columnRestrictions[name.position] = updateRestriction(name.name, stmt.columnRestrictions[name.position], rel); + stmt.columnRestrictions[name.position] = updateRestriction(name, stmt.columnRestrictions[name.position], rel); break; case VALUE_ALIAS: throw new InvalidRequestException(String.format("Restricting the value of a compact CF (%s) is not supported", name.name)); case COLUMN_METADATA: - stmt.metadataRestrictions.put(name, updateRestriction(name.name, stmt.metadataRestrictions.get(name), rel)); + stmt.metadataRestrictions.put(name, updateRestriction(name, stmt.metadataRestrictions.get(name), rel)); break; } } @@ -941,21 +967,9 @@ public class SelectStatement implements CQLStatement if (!hasEq) throw new InvalidRequestException("No indexed columns present in by-columns clause with Equal operator"); - // If we have indexed columns and the key = X clause, we transform it into a key >= X AND key <= X clause. - // If it's a IN relation however, we reject it. - if (stmt.keyRestriction != null && stmt.keyRestriction.isEquality()) - { - if (stmt.keyRestriction.eqValues.size() > 1) - throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported"); - - Restriction newRestriction = new Restriction(); - for (Bound b : Bound.values()) - { - newRestriction.setBound(b, stmt.keyRestriction.eqValues.get(0)); - newRestriction.setInclusive(b); - } - stmt.keyRestriction = newRestriction; - } + // If we have indexed columns and the key = X clause, we will do a range query, but if it's a IN relation, we don't know how to handle it. + if (stmt.keyRestriction != null && stmt.keyRestriction.isEquality() && stmt.keyRestriction.eqValues.size() > 1) + throw new InvalidRequestException("Select on indexed columns and with IN clause for the PRIMARY KEY are not supported"); } if (!stmt.parameters.orderings.isEmpty()) @@ -1008,6 +1022,10 @@ public class SelectStatement implements CQLStatement throw new InvalidRequestException("Descending order is only supported is the first part of the PRIMARY KEY is restricted by an Equal or a IN"); } + // If this is a query on tokens, it's necessary a range query (there can be more than one key per token), so reject IN queries (as we don't know how to do them) + if (stmt.keyRestriction != null && stmt.keyRestriction.onToken && stmt.keyRestriction.isEquality() && stmt.keyRestriction.eqValues.size() > 1) + throw new InvalidRequestException("Select using the token() function don't support IN clause"); + return new ParsedStatement.Prepared(stmt, Arrays.<AbstractType<?>>asList(types)); } @@ -1016,14 +1034,17 @@ public class SelectStatement implements CQLStatement return name.type instanceof ReversedType; } - Restriction updateRestriction(ColumnIdentifier name, Restriction restriction, Relation newRel) throws InvalidRequestException + Restriction updateRestriction(CFDefinition.Name name, Restriction restriction, Relation newRel) throws InvalidRequestException { + if (newRel.onToken && name.kind != CFDefinition.Name.Kind.KEY_ALIAS) + throw new InvalidRequestException(String.format("The token() function is only supported on the partition key, found on %s", name)); + switch (newRel.operator()) { case EQ: if (restriction != null) throw new InvalidRequestException(String.format("%s cannot be restricted by more than one relation if it includes an Equal", name)); - restriction = new Restriction(newRel.getValue()); + restriction = new Restriction(newRel.getValue(), newRel.onToken); break; case IN: if (restriction != null) @@ -1035,8 +1056,8 @@ public class SelectStatement implements CQLStatement case LT: case LTE: if (restriction == null) - restriction = new Restriction(); - restriction.setBound(name, newRel.operator(), newRel.getValue()); + restriction = new Restriction(newRel.onToken); + restriction.setBound(name.name, newRel.operator(), newRel.getValue()); break; } return restriction; @@ -1066,23 +1087,27 @@ public class SelectStatement implements CQLStatement private final Term[] bounds; private final boolean[] boundInclusive; + final boolean onToken; + Restriction(List<Term> values) { this.eqValues = values; this.bounds = null; this.boundInclusive = null; + this.onToken = false; } - Restriction(Term value) + Restriction(Term value, boolean onToken) { this(Collections.singletonList(value)); } - Restriction() + Restriction(boolean onToken) { this.eqValues = null; this.bounds = new Term[2]; this.boundInclusive = new boolean[2]; + this.onToken = onToken; } boolean isEquality() @@ -1167,17 +1192,19 @@ public class SelectStatement implements CQLStatement @Override public String toString() { + String s; if (eqValues == null) { - return String.format("SLICE(%s %s, %s %s)", boundInclusive[0] ? ">=" : ">", + s = String.format("SLICE(%s %s, %s %s)", boundInclusive[0] ? ">=" : ">", bounds[0], boundInclusive[1] ? "<=" : "<", bounds[1]); } else { - return String.format("EQ(%s)", eqValues); + s = String.format("EQ(%s)", eqValues); } + return onToken ? s + "*" : s; } }
