Merge branch 'cassandra-1.2' into trunk

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/cql3/QueryProcessor.java
        src/java/org/apache/cassandra/cql3/statements/SelectStatement.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/54efc08d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/54efc08d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/54efc08d

Branch: refs/heads/trunk
Commit: 54efc08d805d2b6ab7a59b0a775868ea2a6acd02
Parents: 88f65a1 0beab74
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Jul 12 17:16:32 2013 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Fri Jul 12 17:16:32 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../apache/cassandra/cql3/QueryProcessor.java   |  13 ---
 .../cql3/statements/SelectStatement.java        | 110 +++++++++++++------
 .../apache/cassandra/db/filter/ColumnSlice.java |  35 +-----
 4 files changed, 82 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 75c5f67,fa019c0..91657fb
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@@ -94,20 -94,7 +94,7 @@@ public class QueryProcesso
          }
      }
  
-     public static void validateSliceFilter(CFMetaData metadata, 
SliceQueryFilter range)
-     throws InvalidRequestException
-     {
-         try
-         {
-             ColumnSlice.validate(range.slices, metadata.comparator, 
range.reversed);
-         }
-         catch (IllegalArgumentException e)
-         {
-             throw new InvalidRequestException(e.getMessage());
-         }
-     }
- 
 -    private static ResultMessage processStatement(CQLStatement statement, 
ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables)
 +    private static ResultMessage processStatement(CQLStatement statement, 
ConsistencyLevel cl, QueryState queryState, List<ByteBuffer> variables, int 
pageSize, PagingState pageState)
      throws RequestExecutionException, RequestValidationException
      {
          logger.trace("Process {} @CL.{}", statement, cl);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index a3cf6d2,6224af6..0796252
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@@ -145,63 -121,26 +145,77 @@@ public class SelectStatement implement
  
          cl.validateForRead(keyspace());
  
- 
 +        int limit = getLimit(variables);
 +        long now = System.currentTimeMillis();
-         Pageable command = isKeyRange || usesSecondaryIndexing
-                          ? getRangeCommand(variables, limit, now)
-                          : new 
Pageable.ReadCommands(getSliceCommands(variables, limit, now));
++        Pageable command;
++        if (isKeyRange || usesSecondaryIndexing)
++        {
++            command = getRangeCommand(variables, limit, now);
++        }
++        else
++        {
++            List<ReadCommand> commands = getSliceCommands(variables, limit, 
now);
++            command = commands == null ? null : new 
Pageable.ReadCommands(commands);
++        }
 +
 +        // A count query will never be paged for the user, but we always page 
it internally to avoid OOM.
 +        // If we user provided a pageSize we'll use that to page internally 
(because why not), otherwise we use our default
 +        if (parameters.isCount && pageSize < 0)
 +            pageSize = DEFAULT_COUNT_PAGE_SIZE;
 +
-         if (pageSize < 0 || !QueryPagers.mayNeedPaging(command, pageSize))
++        if (pageSize < 0 || command == null || 
!QueryPagers.mayNeedPaging(command, pageSize))
 +        {
 +            return execute(command, cl, variables, limit, now);
 +        }
 +        else
 +        {
 +            QueryPager pager = QueryPagers.pager(command, cl, pagingState);
 +            if (parameters.isCount)
 +                return pageCountQuery(pager, variables, pageSize, now);
 +
 +            List<Row> page = pager.fetchPage(pageSize);
 +            ResultMessage.Rows msg = processResults(page, variables, limit, 
now);
 +            msg.result.metadata.setHasMorePages(pager.state());
 +            return msg;
 +        }
 +    }
 +
 +    private ResultMessage.Rows execute(Pageable command, ConsistencyLevel cl, 
List<ByteBuffer> variables, int limit, long now) throws 
RequestValidationException, RequestExecutionException
 +    {
-         List<Row> rows = command instanceof Pageable.ReadCommands
-                        ? 
StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
-                        : 
StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+         List<Row> rows;
 -        if (isKeyRange)
++        if (command == null)
+         {
 -            RangeSliceCommand command = getRangeCommand(variables);
 -            rows = command == null ? Collections.<Row>emptyList() : 
StorageProxy.getRangeSlice(command, cl);
++            rows = Collections.<Row>emptyList();
+         }
+         else
+         {
 -            List<ReadCommand> commands = getSliceCommands(variables);
 -            rows = commands == null ? Collections.<Row>emptyList() : 
StorageProxy.read(commands, cl);
++            rows = command instanceof Pageable.ReadCommands
++                 ? 
StorageProxy.read(((Pageable.ReadCommands)command).commands, cl)
++                 : StorageProxy.getRangeSlice((RangeSliceCommand)command, cl);
+         }
  
 -        return processResults(rows, variables);
 +        return processResults(rows, variables, limit, now);
      }
  
 -    private ResultMessage.Rows processResults(List<Row> rows, 
List<ByteBuffer> variables) throws RequestValidationException
 +    private ResultMessage.Rows pageCountQuery(QueryPager pager, 
List<ByteBuffer> variables, int pageSize, long now) throws 
RequestValidationException, RequestExecutionException
 +    {
 +        int count = 0;
 +        while (!pager.isExhausted())
 +        {
 +            int maxLimit = pager.maxRemaining();
 +            ResultSet rset = process(pager.fetchPage(pageSize), variables, 
maxLimit, now);
 +            count += rset.rows.size();
 +        }
 +
 +        ResultSet result = ResultSet.makeCountResult(keyspace(), 
columnFamily(), count, parameters.countAlias);
 +        return new ResultMessage.Rows(result);
 +    }
 +
 +    public ResultMessage.Rows processResults(List<Row> rows, List<ByteBuffer> 
variables, int limit, long now) throws RequestValidationException
      {
          // Even for count, we need to process the result as it'll group some 
column together in sparse column families
 -        ResultSet rset = process(rows, variables);
 -        rset = parameters.isCount ? rset.makeCountResult() : rset;
 +        ResultSet rset = process(rows, variables, limit, now);
 +        rset = parameters.isCount ? 
rset.makeCountResult(parameters.countAlias) : rset;
          return new ResultMessage.Rows(rset);
      }
  
@@@ -216,14 -155,31 +230,22 @@@
  
      public ResultMessage.Rows executeInternal(QueryState state) throws 
RequestExecutionException, RequestValidationException
      {
 -        try
 -        {
 -            List<ByteBuffer> variables = Collections.<ByteBuffer>emptyList();
 -            List<Row> rows;
 -            if (isKeyRange)
 -            {
 -                RangeSliceCommand command = getRangeCommand(variables);
 -                rows = command == null ? Collections.<Row>emptyList() : 
RangeSliceVerbHandler.executeLocally(command);
 -            }
 -            else
 -            {
 -                List<ReadCommand> commands = getSliceCommands(variables);
 -                rows = commands == null ? Collections.<Row>emptyList() : 
readLocally(keyspace(), commands);
 -            }
 -
 -            return processResults(rows, variables);
 -        }
 -        catch (ExecutionException e)
 +        List<ByteBuffer> variables = Collections.emptyList();
 +        int limit = getLimit(variables);
 +        long now = System.currentTimeMillis();
-         List<Row> rows = isKeyRange || usesSecondaryIndexing
-                        ? getRangeCommand(variables, limit, 
now).executeLocally()
-                        : readLocally(keyspace(), getSliceCommands(variables, 
limit, now));
++        List<Row> rows;
++        if (isKeyRange || usesSecondaryIndexing)
+         {
 -            throw new RuntimeException(e);
++            RangeSliceCommand command = getRangeCommand(variables, limit, 
now);
++            rows = command == null ? Collections.<Row>emptyList() : 
command.executeLocally();
+         }
 -        catch (InterruptedException e)
++        else
+         {
 -            throw new RuntimeException(e);
++            List<ReadCommand> commands = getSliceCommands(variables, limit, 
now);
++            rows = commands == null ? Collections.<Row>emptyList() : 
readLocally(keyspace(), commands);
+         }
 +
 +        return processResults(rows, variables, limit, now);
      }
  
      public ResultSet process(List<Row> rows) throws InvalidRequestException
@@@ -247,27 -203,61 +269,36 @@@
          Collection<ByteBuffer> keys = getKeys(variables);
          List<ReadCommand> commands = new ArrayList<ReadCommand>(keys.size());
  
 -        // ...a range (slice) of column names
 -        if (isColumnRange())
 +        IDiskAtomFilter filter = makeFilter(variables, limit);
++        if (filter == null)
++            return null;
++
 +        // Note that we use the total limit for every key, which is 
potentially inefficient.
 +        // However, IN + LIMIT is not a very sensible choice.
 +        for (ByteBuffer key : keys)
          {
 -            // Note that we use the total limit for every key. This is
 -            // potentially inefficient, but then again, IN + LIMIT is not a
 -            // very sensible choice
 -            for (ByteBuffer key : keys)
 -            {
 -                QueryProcessor.validateKey(key);
 -                // Note that we should not share the slice filter amongst the 
command, due to SliceQueryFilter not
 -                // being immutable due to its columnCounter used by the 
lastCounted() method
 -                // (this is fairly ugly and we should change that but that's 
probably not a tiny refactor to do that cleanly)
 -                SliceQueryFilter filter = 
(SliceQueryFilter)makeFilter(variables);
 -                if (filter == null)
 -                    return null;
 -
 -                commands.add(new SliceFromReadCommand(keyspace(), key, 
queryPath, filter));
 -            }
 -        }
 -        // ...of a list of column names
 -        else
 -        {
 -            // ByNames commands can share the filter
 -            IDiskAtomFilter filter = makeFilter(variables); // names filter 
are never null
 -            for (ByteBuffer key: keys)
 -            {
 -                QueryProcessor.validateKey(key);
 -                commands.add(new SliceByNamesReadCommand(keyspace(), key, 
queryPath, (NamesQueryFilter)filter));
 -            }
 +            QueryProcessor.validateKey(key);
 +            // We should not share the slice filter amongst the commands 
(hence the cloneShallow), due to
 +            // SliceQueryFilter not being immutable due to its columnCounter 
used by the lastCounted() method
 +            // (this is fairly ugly and we should change that but that's 
probably not a tiny refactor to do that cleanly)
 +            commands.add(ReadCommand.create(keyspace(), key, columnFamily(), 
now, filter.cloneShallow()));
          }
          return commands;
      }
  
 -    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables) 
throws RequestValidationException
 +    private RangeSliceCommand getRangeCommand(List<ByteBuffer> variables, int 
limit, long now) throws RequestValidationException
      {
 -        IDiskAtomFilter filter = makeFilter(variables);
 +        IDiskAtomFilter filter = makeFilter(variables, limit);
+         if (filter == null)
+             return null;
+ 
          List<IndexExpression> expressions = getIndexExpressions(variables);
          // The LIMIT provided by the user is the number of CQL row he wants 
returned.
          // We want to have getRangeSlice to count the number of columns, not 
the number of keys.
-         return new RangeSliceCommand(keyspace(), columnFamily(), now,  
filter, getKeyBounds(variables), expressions, limit, true, false);
+         AbstractBounds<RowPosition> keyBounds = getKeyBounds(variables);
+         return keyBounds == null
+              ? null
 -             : new RangeSliceCommand(keyspace(),
 -                                     columnFamily(),
 -                                     null,
 -                                     filter,
 -                                     keyBounds,
 -                                     expressions,
 -                                     getLimit(),
 -                                     true,
 -                                     false);
++             : new RangeSliceCommand(keyspace(), columnFamily(), now,  
filter, keyBounds, expressions, limit, true, false);
      }
  
      private AbstractBounds<RowPosition> getKeyBounds(List<ByteBuffer> 
variables) throws InvalidRequestException
@@@ -306,15 -309,14 +350,14 @@@
              }
              else
              {
-                 bounds = includeKeyBound(Bound.END)
-                     ? new Range<RowPosition>(startKey, finishKey)
-                     : new ExcludingBounds<RowPosition>(startKey, finishKey);
+                 return includeKeyBound(Bound.END)
+                      ? new Range<RowPosition>(startKey, finishKey)
+                      : new ExcludingBounds<RowPosition>(startKey, finishKey);
              }
          }
-         return bounds;
      }
  
 -    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables)
 +    private IDiskAtomFilter makeFilter(List<ByteBuffer> variables, int limit)
      throws InvalidRequestException
      {
          if (isColumnRange())
@@@ -326,11 -328,16 +369,14 @@@
              int toGroup = cfDef.isCompact ? -1 : cfDef.columns.size();
              ColumnSlice slice = new 
ColumnSlice(getRequestedBound(Bound.START, variables),
                                                  getRequestedBound(Bound.END, 
variables));
+ 
+             if (slice.isAlwaysEmpty(cfDef.cfm.comparator, isReversed))
+                 return null;
+ 
              SliceQueryFilter filter = new SliceQueryFilter(new 
ColumnSlice[]{slice},
                                                             isReversed,
 -                                                           getLimit(),
 -                                                           toGroup,
 -                                                           multiplier);
 +                                                           limit,
 +                                                           toGroup);
-             QueryProcessor.validateSliceFilter(cfDef.cfm, filter);
              return filter;
          }
          else
@@@ -1042,16 -1052,9 +1088,16 @@@
                      if (stmt.onToken)
                          throw new InvalidRequestException("The token() 
function must be applied to all partition key components or none of them");
  
-                     // Under a non order perserving partitioner, the only 
time not restricting a key part is allowed is if none are restricted
-                     if (!partitioner.preservesOrder() && i > 0 && 
stmt.keyRestrictions[i-1] != null)
+                     // The only time not restricting a key part is allowed is 
if none are restricted
+                     if (i > 0 && stmt.keyRestrictions[i-1] != null)
 +                    {
 +                        if (hasQueriableIndex)
 +                        {
 +                            stmt.usesSecondaryIndexing = true;
 +                            break;
 +                        }
                          throw new 
InvalidRequestException(String.format("Partition key part %s must be restricted 
since preceding part is", cname));
 +                    }
  
                      stmt.isKeyRange = true;
                      shouldBeDone = true;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/54efc08d/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/filter/ColumnSlice.java
index 49503b5,208e181..e0c576e
--- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java
@@@ -47,41 -47,46 +47,10 @@@ public class ColumnSlic
          this.finish = finish;
      }
  
--    /**
--     * Validate an array of column slices.
--     * To be valid, the slices must be sorted and non-overlapping and each 
slice must be valid.
--     *
--     * @throws IllegalArgumentException if the input slices are not valid.
--     */
-     public static void validate(ColumnSlice[] slices, AbstractType<?> 
comparator, boolean reversed)
-     {
-         for (int i = 0; i < slices.length; i++)
-         {
-             ColumnSlice slice = slices[i];
-             validate(slice, comparator, reversed);
-             if (i > 0)
-             {
-                 if (slices[i - 1].finish.remaining() == 0 || 
slice.start.remaining() == 0)
-                     throw new IllegalArgumentException("Invalid column 
slices: slices must be sorted and non-overlapping");
- 
-                 int cmp = comparator.compare(slices[i -1].finish, 
slice.start);
-                 if (reversed ? cmp <= 0 : cmp >= 0)
-                     throw new IllegalArgumentException("Invalid column 
slices: slices must be sorted and non-overlapping");
-             }
-         }
-     }
- 
-     /**
-      * Validate a column slices.
-      * To be valid, the slice start must sort before the slice end.
-      *
-      * @throws IllegalArgumentException if the slice is not valid.
-      */
-     public static void validate(ColumnSlice slice, AbstractType<?> 
comparator, boolean reversed)
 -    //public static void validate(ColumnSlice[] slices, AbstractType<?> 
comparator, boolean reversed)
 -    //{
 -    //    for (int i = 0; i < slices.length; i++)
 -    //    {
 -    //        ColumnSlice slice = slices[i];
 -    //        validate(slice, comparator, reversed);
 -    //        if (i > 0)
 -    //        {
 -    //            if (slices[i - 1].finish.remaining() == 0 || 
slice.start.remaining() == 0)
 -    //                throw new IllegalArgumentException("Invalid column 
slices: slices must be sorted and non-overlapping");
 -
 -    //            int cmp = comparator.compare(slices[i -1].finish, 
slice.start);
 -    //            if (reversed ? cmp <= 0 : cmp >= 0)
 -    //                throw new IllegalArgumentException("Invalid column 
slices: slices must be sorted and non-overlapping");
 -    //        }
 -    //    }
 -    //}
 -
 -    /**
 -     * Validate a column slices.
 -     * To be valid, the slice start must sort before the slice end.
 -     *
 -     * @throws IllegalArgumentException if the slice is not valid.
 -     */
 -    //public static void validate(ColumnSlice slice, AbstractType<?> 
comparator, boolean reversed)
 -    //{
 -    //    if (slice.isAlwaysEmpty(comparator, reversed))
 -    //        throw new IllegalArgumentException("Slice finish must come 
after start in traversal order");
 -    //}
 -
+     public boolean isAlwaysEmpty(AbstractType<?> comparator, boolean reversed)
      {
          Comparator<ByteBuffer> orderedComparator = reversed ? 
comparator.reverseComparator : comparator;
-         if (slice.start.remaining() > 0 && slice.finish.remaining() > 0 && 
orderedComparator.compare(slice.start, slice.finish) > 0)
-             throw new IllegalArgumentException("Slice finish must come after 
start in traversal order");
+         return (start.remaining() > 0 && finish.remaining() > 0 && 
orderedComparator.compare(start, finish) > 0);
      }
  
      public boolean includes(Comparator<ByteBuffer> cmp, ByteBuffer name)

Reply via email to