Merge branch 'cassandra-1.2' into trunk
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/cql3/QueryProcessor.java
src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54efc08d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54efc08d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54efc08d
Branch: refs/heads/trunk
Commit: 54efc08d805d2b6ab7a59b0a775868ea2a6acd02
Parents: 88f65a1 0beab74
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Jul 12 17:16:32 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Fri Jul 12 17:16:32 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/cql3/QueryProcessor.java | 13 ---
.../cql3/statements/SelectStatement.java | 110 +++++++++++++------
.../apache/cassandra/db/filter/ColumnSlice.java | 35 +-----
4 files changed, 82 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 75c5f67,fa019c0..91657fb
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -94,20 -94,7 +94,7 @@@ public class QueryProcesso
}
}
- public static void validateSliceFilter(CFMetaData metadata,
SliceQueryFilter range)
- throws InvalidRequestException
- {
- try
- {
- ColumnSlice.validate(range.slices, metadata.comparator,
range.reversed);
- }
- catch (IllegalArgumentException e)
- {
- throw new InvalidRequestException(e.getMessage());
- }
- }
-
- private static ResultMessage processStatement(CQLStatement statement,
ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
+ private static ResultMessage processStatement(CQLStatement statement,
ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int
pageSize, PagingState pageState)
throws RequestExecutionException, RequestValidationException
{
logger.trace("Process {} @CL.{}", statement, cl);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a3cf6d2,6224af6..0796252
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -145,63 -121,26 +145,77 @@@ public class SelectStatement implement
cl.validateForRead(keyspace());
-
+ int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
- Pageable command = isKeyRange || usesSecondaryIndexing
- ? getRangeCommand(variables, limit, now)
- : new
Pageable.ReadCommands(getSliceCommands(variables, limit, now));
++ Pageable command;
++ if (isKeyRange || usesSecondaryIndexing)
++ {
++ command = getRangeCommand(variables, limit, now);
++ }
++ else
++ {
++ List<ReadCommand> commands = getSliceCommands(variables, limit,
now);
++ command = commands == null ? null : new
Pageable.ReadCommands(commands);
++ }
+
+ // A count query will never be paged for the user, but we always page
it internally to avoid OOM.
+ // If we user provided a pageSize we'll use that to page internally
(because why not), otherwise we use our default
+ if (parameters.isCount && pageSize < 0)
+ pageSize = DEFAULT_COUNT_PAGE_SIZE;
+
- if (pageSize < 0 || !QueryPagers.mayNeedPaging(command, pageSize))
++ if (pageSize < 0 || command == null ||
!QueryPagers.mayNeedPaging(command, pageSize))
+ {
+ return execute(command, cl, variables, limit, now);
+ }
+ else
+ {
+ QueryPager pager = QueryPagers.pager(command, cl, pagingState);
+ if (parameters.isCount)
+ return pageCountQuery(pager, variables, pageSize, now);
+
+ List<Row> page = pager.fetchPage(pageSize);
+ ResultMessage.Rows msg = processResults(page, variables, limit,
now);
+ msg.result.metadata.setHasMorePages(pager.state());
+ return msg;
+ }
+ }
+
+ private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl,
List<ByteBuffer> variables, int limit, long now) throws
RequestValidationException, RequestExecutionException
+ {
- List<Row> rows = command instanceof Pageable.ReadCommands
- ?
StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
- :
StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+ List<Row> rows;
- if (isKeyRange)
++ if (command == null)
+ {
- RangeSliceCommand command = getRangeCommand(variables);
- rows = command == null ? Collections.<Row>emptyList() :
StorageProxy.getRangeSlice(command, cl);
++ rows = Collections.<Row>emptyList();
+ }
+ else
+ {
- List<ReadCommand> commands = getSliceCommands(variables);
- rows = commands == null ? Collections.<Row>emptyList() :
StorageProxy.read(commands, cl);
++ rows = command instanceof Pageable.ReadCommands
++ ?
StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
++ : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+ }
- return processResults(rows, variables);
+ return processResults(rows, variables, limit, now);
}
- private ResultMessage.Rows processResults(List<Row> rows,
List<ByteBuffer> variables) throws RequestValidationException
+ private ResultMessage.Rows pageCountQuery(QueryPager pager,
List<ByteBuffer> variables, int pageSize, long now) throws
RequestValidationException, RequestExecutionException
+ {
+ int count = 0;
+ while (!pager.isExhausted())
+ {
+ int maxLimit = pager.maxRemaining();
+ ResultSet rset = process(pager.fetchPage(pageSize), variables,
maxLimit, now);
+ count += rset.rows.size();
+ }
+
+ ResultSet result = ResultSet.makeCountResult(keyspace(),
columnFamily(), count, parameters.countAlias);
+ return new ResultMessage.Rows(result);
+ }
+
+ public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer>
variables, int limit, long now) throws RequestValidationException
{
// Even for count, we need to process the result as it'll group some
column together in sparse column families
- ResultSet rset = process(rows, variables);
- rset = parameters.isCount ? rset.makeCountResult() : rset;
+ ResultSet rset = process(rows, variables, limit, now);
+ rset = parameters.isCount ?
rset.makeCountResult(parameters.countAlias) : rset;
return new ResultMessage.Rows(rset);
}
@@@ -216,14 -155,31 +230,22 @@@
public ResultMessage.Rows executeInternal(QueryState state) throws
RequestExecutionException, RequestValidationException
{
- try
- {
- List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
- List<Row> rows;
- if (isKeyRange)
- {
- RangeSliceCommand command = getRangeCommand(variables);
- rows = command == null ? Collections.<Row>emptyList() :
RangeSliceVerbHandler.executeLocally(command);
- }
- else
- {
- List<ReadCommand> commands = getSliceCommands(variables);
- rows = commands == null ? Collections.<Row>emptyList() :
readLocally(keyspace(), commands);
- }
-
- return processResults(rows, variables);
- }
- catch (ExecutionException e)
+ List<ByteBuffer> variables = Collections.emptyList();
+ int limit = getLimit(variables);
+ long now = System.currentTimeMillis();
- List<Row> rows = isKeyRange || usesSecondaryIndexing
- ? getRangeCommand(variables, limit,
now).executeLocally()
- : readLocally(keyspace(), getSliceCommands(variables,
limit, now));
++ List<Row> rows;
++ if (isKeyRange || usesSecondaryIndexing)
+ {
- throw new RuntimeException(e);
++ RangeSliceCommand command = getRangeCommand(variables, limit,
now);
++ rows = command == null ? Collections.<Row>emptyList() :
command.executeLocally();
+ }
- catch (InterruptedException e)
++ else
+ {
- throw new RuntimeException(e);
++ List<ReadCommand> commands = getSliceCommands(variables, limit,
now);
++ rows = commands == null ? Collections.<Row>emptyList() :
readLocally(keyspace(), commands);
+ }
+
+ return processResults(rows, variables, limit, now);
}
public ResultSet process(List<Row> rows) throws InvalidRequestException
@@@ -247,27 -203,61 +269,36 @@@
Collection<ByteBuffer> keys = getKeys(variables);
List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
- // ...a range (slice) of column names
- if (isColumnRange())
+ IDiskAtomFilter filter = makeFilter(variables, limit);
++ if (filter == null)
++ return null;
++
+ // Note that we use the total limit for every key, which is
potentially inefficient.
+ // However, IN + LIMIT is not a very sensible choice.
+ for (ByteBuffer key : keys)
{
- // Note that we use the total limit for every key. This is
- // potentially inefficient, but then again, IN + LIMIT is not a
- // very sensible choice
- for (ByteBuffer key : keys)
- {
- QueryProcessor.validateKey(key);
- // Note that we should not share the slice filter amongst the
command, due to SliceQueryFilter not
- // being immutable due to its columnCounter used by the
lastCounted() method
- // (this is fairly ugly and we should change that but that's
probably not a tiny refactor to do that cleanly)
- SliceQueryFilter filter =
(SliceQueryFilter)makeFilter(variables);
- if (filter == null)
- return null;
-
- commands.add(new SliceFromReadCommand(keyspace(), key,
queryPath, filter));
- }
- }
- // ...of a list of column names
- else
- {
- // ByNames commands can share the filter
- IDiskAtomFilter filter = makeFilter(variables); // names filter
are never null
- for (ByteBuffer key: keys)
- {
- QueryProcessor.validateKey(key);
- commands.add(new SliceByNamesReadCommand(keyspace(), key,
queryPath, (NamesQueryFilter)filter));
- }
+ QueryProcessor.validateKey(key);
+ // We should not share the slice filter amongst the commands
(hence the cloneShallow), due to
+ // SliceQueryFilter not being immutable due to its columnCounter
used by the lastCounted() method
+ // (this is fairly ugly and we should change that but that's
probably not a tiny refactor to do that cleanly)
+ commands.add(ReadCommand.create(keyspace(), key, columnFamily(),
now, filter.cloneShallow()));
}
return commands;
}
- private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables)
throws RequestValidationException
+ private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int
limit, long now) throws RequestValidationException
{
- IDiskAtomFilter filter = makeFilter(variables);
+ IDiskAtomFilter filter = makeFilter(variables, limit);
+ if (filter == null)
+ return null;
+
List<IndexExpression> expressions = getIndexExpressions(variables);
// The LIMIT provided by the user is the number of CQL row he wants
returned.
// We want to have getRangeSlice to count the number of columns, not
the number of keys.
- return new RangeSliceCommand(keyspace(), columnFamily(), now,
filter, getKeyBounds(variables), expressions, limit, true, false);
+ AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+ return keyBounds == null
+ ? null
- : new RangeSliceCommand(keyspace(),
- columnFamily(),
- null,
- filter,
- keyBounds,
- expressions,
- getLimit(),
- true,
- false);
++ : new RangeSliceCommand(keyspace(), columnFamily(), now,
filter, keyBounds, expressions, limit, true, false);
}
private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer>
variables) throws InvalidRequestException
@@@ -306,15 -309,14 +350,14 @@@
}
else
{
- bounds = includeKeyBound(Bound.END)
- ? new Range<RowPosition>(startKey, finishKey)
- : new ExcludingBounds<RowPosition>(startKey, finishKey);
+ return includeKeyBound(Bound.END)
+ ? new Range<RowPosition>(startKey, finishKey)
+ : new ExcludingBounds<RowPosition>(startKey, finishKey);
}
}
- return bounds;
}
- private IDiskAtomFilter makeFilter(List<ByteBuffer> variables)
+ private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
throws InvalidRequestException
{
if (isColumnRange())
@@@ -326,11 -328,16 +369,14 @@@
int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
ColumnSlice slice = new
ColumnSlice(getRequestedBound(Bound.START, variables),
getRequestedBound(Bound.END,
variables));
+
+ if (slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+ return null;
+
SliceQueryFilter filter = new SliceQueryFilter(new
ColumnSlice[]{slice},
isReversed,
- getLimit(),
- toGroup,
- multiplier);
+ limit,
+ toGroup);
- QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
return filter;
}
else
@@@ -1042,16 -1052,9 +1088,16 @@@
if (stmt.onToken)
throw new InvalidRequestException("The token()
function must be applied to all partition key components or none of them");
- // Under a non order perserving partitioner, the only
time not restricting a key part is allowed is if none are restricted
- if (!partitioner.preservesOrder() && i > 0 &&
stmt.keyRestrictions[i-1] != null)
+ // The only time not restricting a key part is allowed is
if none are restricted
+ if (i > 0 && stmt.keyRestrictions[i-1] != null)
+ {
+ if (hasQueriableIndex)
+ {
+ stmt.usesSecondaryIndexing = true;
+ break;
+ }
throw new
InvalidRequestException(String.format("Partition key part %s must be restricted
since preceding part is", cname));
+ }
stmt.isKeyRange = true;
shouldBeDone = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 49503b5,208e181..e0c576e
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@@ -47,41 -47,46 +47,10 @@@ public class ColumnSlic
this.finish = finish;
}
-- /**
-- * Validate an array of column slices.
-- * To be valid, the slices must be sorted and non-overlapping and each
slice must be valid.
-- *
-- * @throws IllegalArgumentException if the input slices are not valid.
-- */
- public static void validate(ColumnSlice[] slices, AbstractType<?>
comparator, boolean reversed)
- {
- for (int i = 0; i < slices.length; i++)
- {
- ColumnSlice slice = slices[i];
- validate(slice, comparator, reversed);
- if (i > 0)
- {
- if (slices[i - 1].finish.remaining() == 0 ||
slice.start.remaining() == 0)
- throw new IllegalArgumentException("Invalid column
slices: slices must be sorted and non-overlapping");
-
- int cmp = comparator.compare(slices[i -1].finish,
slice.start);
- if (reversed ? cmp <= 0 : cmp >= 0)
- throw new IllegalArgumentException("Invalid column
slices: slices must be sorted and non-overlapping");
- }
- }
- }
-
- /**
- * Validate a column slices.
- * To be valid, the slice start must sort before the slice end.
- *
- * @throws IllegalArgumentException if the slice is not valid.
- */
- public static void validate(ColumnSlice slice, AbstractType<?>
comparator, boolean reversed)
- //public static void validate(ColumnSlice[] slices, AbstractType<?>
comparator, boolean reversed)
- //{
- // for (int i = 0; i < slices.length; i++)
- // {
- // ColumnSlice slice = slices[i];
- // validate(slice, comparator, reversed);
- // if (i > 0)
- // {
- // if (slices[i - 1].finish.remaining() == 0 ||
slice.start.remaining() == 0)
- // throw new IllegalArgumentException("Invalid column
slices: slices must be sorted and non-overlapping");
-
- // int cmp = comparator.compare(slices[i -1].finish,
slice.start);
- // if (reversed ? cmp <= 0 : cmp >= 0)
- // throw new IllegalArgumentException("Invalid column
slices: slices must be sorted and non-overlapping");
- // }
- // }
- //}
-
- /**
- * Validate a column slices.
- * To be valid, the slice start must sort before the slice end.
- *
- * @throws IllegalArgumentException if the slice is not valid.
- */
- //public static void validate(ColumnSlice slice, AbstractType<?>
comparator, boolean reversed)
- //{
- // if (slice.isAlwaysEmpty(comparator, reversed))
- // throw new IllegalArgumentException("Slice finish must come
after start in traversal order");
- //}
-
+ public boolean isAlwaysEmpty(AbstractType<?> comparator, boolean reversed)
{
Comparator<ByteBuffer> orderedComparator = reversed ?
comparator.reverseComparator : comparator;
- if (slice.start.remaining() > 0 && slice.finish.remaining() > 0 &&
orderedComparator.compare(slice.start, slice.finish) > 0)
- throw new IllegalArgumentException("Slice finish must come after
start in traversal order");
+ return (start.remaining() > 0 && finish.remaining() > 0 &&
orderedComparator.compare(start, finish) > 0);
}
public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)