Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 32112d80e -> 474d3bf8c
Fix paging for range queries where all clustering columns are specified patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-11669 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/474d3bf8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/474d3bf8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/474d3bf8 Branch: refs/heads/cassandra-3.0 Commit: 474d3bf8c14e2b572efd9b6a66703f9cbe1164c5 Parents: 32112d8 Author: Benjamin Lerer <[email protected]> Authored: Fri Apr 29 12:01:12 2016 +0200 Committer: Benjamin Lerer <[email protected]> Committed: Fri Apr 29 12:01:12 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/PartitionRangeReadCommand.java | 5 +- .../service/pager/PartitionRangeQueryPager.java | 130 ++++++++++++++++++ .../service/pager/RangeNamesQueryPager.java | 92 ------------- .../service/pager/RangeSliceQueryPager.java | 131 ------------------- 5 files changed, 132 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/474d3bf8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 46206b1..268d011 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.6 + * Fix paging for range queries where all clustering columns are specified (CASSANDRA-11669) * Don't require HEAP_NEW_SIZE to be set when using G1 (CASSANDRA-11600) * Fix sstabledump not showing cells after tombstone marker (CASSANDRA-11654) * Ignore all LocalStrategy keyspaces for streaming and other related http://git-wip-us.apache.org/repos/asf/cassandra/blob/474d3bf8/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 9fce15e..9585b59 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -165,10 +165,7 @@ public class PartitionRangeReadCommand extends ReadCommand public QueryPager getPager(PagingState pagingState, int protocolVersion) { - if (isNamesQuery()) - return new RangeNamesQueryPager(this, pagingState, protocolVersion); - else - return new RangeSliceQueryPager(this, pagingState, protocolVersion); + return new PartitionRangeQueryPager(this, pagingState, protocolVersion); } protected void recordLatency(TableMetrics metric, long latencyNanos) http://git-wip-us.apache.org/repos/asf/cassandra/blob/474d3bf8/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java new file mode 100644 index 0000000..9c216e3 --- /dev/null +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.pager; + +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.index.Index; +import org.apache.cassandra.schema.IndexMetadata; + +/** + * Pages a PartitionRangeReadCommand. + * + * Note: this only work for CQL3 queries for now (because thrift queries expect + * a different limit on the rows than on the columns, which complicates it). + */ +public class PartitionRangeQueryPager extends AbstractQueryPager +{ + private static final Logger logger = LoggerFactory.getLogger(PartitionRangeQueryPager.class); + + private volatile DecoratedKey lastReturnedKey; + private volatile PagingState.RowMark lastReturnedRow; + + public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion) + { + super(command, protocolVersion); + + if (state != null) + { + lastReturnedKey = command.metadata().decorateKey(state.partitionKey); + lastReturnedRow = state.rowMark; + restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); + } + } + + public PagingState state() + { + return lastReturnedKey == null + ? null + : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition()); + } + + protected ReadCommand nextPageReadCommand(int pageSize) + throws RequestExecutionException + { + DataLimits limits; + DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange(); + DataRange pageRange; + if (lastReturnedKey == null) + { + pageRange = fullRange; + limits = command.limits().forPaging(pageSize); + } + else + { + // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother. + boolean includeLastKey = remainingInPartition() > 0 && lastReturnedRow != null; + AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey); + if (includeLastKey) + { + pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false); + limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); + } + else + { + pageRange = fullRange.forSubRange(bounds); + limits = command.limits().forPaging(pageSize); + } + } + + Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata())); + Optional<IndexMetadata> indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty(); + return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata); + } + + protected void recordLast(DecoratedKey key, Row last) + { + if (last != null) + { + lastReturnedKey = key; + if (last.clustering() != Clustering.STATIC_CLUSTERING) + lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion); + } + } + + protected boolean isPreviouslyReturnedPartition(DecoratedKey key) + { + // Note that lastReturnedKey can be null, but key cannot. + return key.equals(lastReturnedKey); + } + + private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey) + { + AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); + if (bounds instanceof Range || bounds instanceof Bounds) + { + return includeLastKey + ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right) + : new Range<PartitionPosition>(lastReturnedKey, bounds.right); + } + else + { + return includeLastKey + ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right) + : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/474d3bf8/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java deleted file mode 100644 index 9801565..0000000 --- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.RequestExecutionException; - -/** - * Pages a RangeSliceCommand whose predicate is a name query. - * - * Note: this only work for NamesQueryFilter that have countCQL3Rows() set, - * because this assumes the pageSize is counted in number of internal rows - * returned. More precisely, this doesn't do in-row paging so this assumes - * that the counter returned by columnCounter() will count 1 for each internal - * row. - */ -public class RangeNamesQueryPager extends AbstractQueryPager -{ - private volatile DecoratedKey lastReturnedKey; - - public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion) - { - super(command, protocolVersion); - assert command.isNamesQuery(); - - if (state != null) - { - lastReturnedKey = command.metadata().decorateKey(state.partitionKey); - restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); - } - } - - public PagingState state() - { - return lastReturnedKey == null - ? null - : new PagingState(lastReturnedKey.getKey(), null, maxRemaining(), remainingInPartition()); - } - - protected ReadCommand nextPageReadCommand(int pageSize) - throws RequestExecutionException - { - PartitionRangeReadCommand pageCmd = ((PartitionRangeReadCommand)command).withUpdatedLimit(command.limits().forPaging(pageSize)); - if (lastReturnedKey != null) - pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey)); - - return pageCmd; - } - - protected void recordLast(DecoratedKey key, Row last) - { - lastReturnedKey = key; - } - - protected boolean isPreviouslyReturnedPartition(DecoratedKey key) - { - // Note that lastReturnedKey can be null, but key cannot. - return key.equals(lastReturnedKey); - } - - private AbstractBounds<PartitionPosition> makeExcludingKeyBounds(PartitionPosition lastReturnedKey) - { - // We return a range that always exclude lastReturnedKey, since we've already - // returned it. - AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); - if (bounds instanceof Range || bounds instanceof Bounds) - { - return new Range<PartitionPosition>(lastReturnedKey, bounds.right); - } - else - { - return new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/474d3bf8/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java deleted file mode 100644 index 6ad8649..0000000 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import java.util.Optional; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.DataLimits; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.index.Index; -import org.apache.cassandra.schema.IndexMetadata; - -/** - * Pages a RangeSliceCommand whose predicate is a slice query. - * - * Note: this only work for CQL3 queries for now (because thrift queries expect - * a different limit on the rows than on the columns, which complicates it). - */ -public class RangeSliceQueryPager extends AbstractQueryPager -{ - private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class); - - private volatile DecoratedKey lastReturnedKey; - private volatile PagingState.RowMark lastReturnedRow; - - public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state, int protocolVersion) - { - super(command, protocolVersion); - assert !command.isNamesQuery(); - - if (state != null) - { - lastReturnedKey = command.metadata().decorateKey(state.partitionKey); - lastReturnedRow = state.rowMark; - restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); - } - } - - public PagingState state() - { - return lastReturnedKey == null - ? null - : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition()); - } - - protected ReadCommand nextPageReadCommand(int pageSize) - throws RequestExecutionException - { - DataLimits limits; - DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange(); - DataRange pageRange; - if (lastReturnedKey == null) - { - pageRange = fullRange; - limits = command.limits().forPaging(pageSize); - } - else - { - // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother. - boolean includeLastKey = remainingInPartition() > 0 && lastReturnedRow != null; - AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey); - if (includeLastKey) - { - pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false); - limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); - } - else - { - pageRange = fullRange.forSubRange(bounds); - limits = command.limits().forPaging(pageSize); - } - } - - Index index = command.getIndex(Keyspace.openAndGetStore(command.metadata())); - Optional<IndexMetadata> indexMetadata = index != null ? Optional.of(index.getIndexMetadata()) : Optional.empty(); - return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, indexMetadata); - } - - protected void recordLast(DecoratedKey key, Row last) - { - if (last != null) - { - lastReturnedKey = key; - if (last.clustering() != Clustering.STATIC_CLUSTERING) - lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion); - } - } - - protected boolean isPreviouslyReturnedPartition(DecoratedKey key) - { - // Note that lastReturnedKey can be null, but key cannot. - return key.equals(lastReturnedKey); - } - - private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey) - { - AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); - if (bounds instanceof Range || bounds instanceof Bounds) - { - return includeLastKey - ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right) - : new Range<PartitionPosition>(lastReturnedKey, bounds.right); - } - else - { - return includeLastKey - ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right) - : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); - } - } -}
