Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1b5e3a9b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1b5e3a9b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1b5e3a9b Branch: refs/heads/trunk Commit: 1b5e3a9b1be0c945782492e269acb4ea44730ad3 Parents: a880739 98be5de Author: blerer <[email protected]> Authored: Tue Oct 20 14:38:51 2015 +0200 Committer: blerer <[email protected]> Committed: Tue Oct 20 14:39:39 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../service/pager/AbstractQueryPager.java | 21 ++++++++++++++++++-- .../service/pager/RangeSliceQueryPager.java | 5 +++-- .../service/pager/SinglePartitionPager.java | 3 +-- 4 files changed, 24 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 25ad1fb,458d0d5..616ff47 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,35 -1,20 +1,36 @@@ -2.2.4 +3.0 +Merged from 2.2: * Expose phi values from failure detector via JMX and tweak debug and trace logging (CASSANDRA-9526) - * Fix RangeNamesQueryPager (CASSANDRA-10509) - * Deprecate Pig support (CASSANDRA-10542) - * Reduce contention getting instances of CompositeType (CASSANDRA-10433) Merged from 2.1: + * Fix paging issues with partitions containing only static columns data (CASSANDRA-10381) * Fix conditions on static columns (CASSANDRA-10264) * AssertionError: attempted to delete non-existing file CommitLog (CASSANDRA-10377) - * (cqlsh) Distinguish negative and positive infinity in output (CASSANDRA-10523) - * (cqlsh) allow custom time_format for COPY TO (CASSANDRA-8970) - * Don't allow startup if the node's rack has changed (CASSANDRA-10242) - * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) -2.2.3 +3.0-rc2 + * Fix SELECT DISTINCT queries between 2.2.2 nodes and 3.0 nodes (CASSANDRA-10473) + * Remove circular references in SegmentedFile (CASSANDRA-10543) + * Ensure validation of indexed values only occurs once per-partition (CASSANDRA-10536) + * Fix handling of static columns for range tombstones in thrift (CASSANDRA-10174) + * Support empty ColumnFilter for backward compatility on empty IN (CASSANDRA-10471) + * Remove Pig support (CASSANDRA-10542) + * Fix LogFile throws Exception when assertion is disabled (CASSANDRA-10522) + * Revert CASSANDRA-7486, make CMS default GC, move GC config to + conf/jvm.options (CASSANDRA-10403) + * Fix TeeingAppender causing some logs to be truncated/empty (CASSANDRA-10447) + * Allow EACH_QUORUM for reads (CASSANDRA-9602) + * Fix potential ClassCastException while upgrading (CASSANDRA-10468) + * Fix NPE in MVs on update (CASSANDRA-10503) + * Only include modified cell data in indexing deltas (CASSANDRA-10438) + * Do not load keyspace when creating sstable writer (CASSANDRA-10443) + * If node is not yet gossiping write all MV updates to batchlog only (CASSANDRA-10413) + * Re-populate token metadata after commit log recovery (CASSANDRA-10293) + * Provide additional metrics for materialized views (CASSANDRA-10323) + * Flush system schema tables after local schema changes (CASSANDRA-10429) +Merged from 2.2: + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix the regression when using LIMIT with aggregates (CASSANDRA-10487) * Avoid NoClassDefFoundError during DataDescriptor initialization on windows (CASSANDRA-10412) * Preserve case of quoted Role & User names (CASSANDRA-10394) * cqlsh pg-style-strings broken (CASSANDRA-10484) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index b92d1e1,2a35e4b..bdebd43 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@@ -17,179 -17,381 +17,196 @@@ */ package org.apache.cassandra.service.pager; -import java.util.*; +import java.util.NoSuchElementException; -import com.google.common.annotations.VisibleForTesting; - --import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnCounter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.service.ClientState; abstract class AbstractQueryPager implements QueryPager { - private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class); - - private final ConsistencyLevel consistencyLevel; - private final boolean localQuery; - - protected final CFMetaData cfm; - protected final IDiskAtomFilter columnFilter; - private final long timestamp; + protected final ReadCommand command; + protected final DataLimits limits; + protected final int protocolVersion; private int remaining; - private boolean exhausted; - private boolean shouldFetchExtraRow; - - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - String keyspace, - String columnFamily, - IDiskAtomFilter columnFilter, - long timestamp) - { - this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp); - } - - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - CFMetaData cfm, - IDiskAtomFilter columnFilter, - long timestamp) - { - this.consistencyLevel = consistencyLevel; - this.localQuery = localQuery; - - this.cfm = cfm; - this.columnFilter = columnFilter; - this.timestamp = timestamp; - - this.remaining = toFetch; - } - - - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException - { - if (isExhausted()) - return Collections.emptyList(); - - int currentPageSize = nextPageSize(pageSize); - List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery)); - - if (rows.isEmpty()) - { - logger.debug("Got empty set of rows, considering pager exhausted"); - exhausted = true; - return Collections.emptyList(); - } - - int liveCount = getPageLiveCount(rows); - logger.debug("Fetched {} live rows", liveCount); - - // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked - // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient - // but most of the time there should be nothing or very little to trim. - if (liveCount > currentPageSize) - { - rows = discardLast(rows, liveCount - currentPageSize); - liveCount = currentPageSize; - } - - remaining -= liveCount; - - // If we've got less than requested, there is no more query to do (but - // we still need to return the current page) - if (liveCount < currentPageSize) - { - logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize); - exhausted = true; - } - - // If it's not the first query and the first column is the last one returned (likely - // but not certain since paging can race with deletes/expiration), then remove the - // first column. - if (containsPreviousLast(rows.get(0))) - { - rows = discardFirst(rows); - remaining++; - } - // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size, - // so if the page is full, trim the last entry - else if (shouldFetchExtraRow && !exhausted) - { - // We've asked for one more than necessary - rows = discardLast(rows); - remaining++; - } - - logger.debug("Remaining rows to page: {}", remaining); - if (!isExhausted()) - shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1)); + // This is the last key we've been reading from (or can still be reading within). This the key for + // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition + // (and this is done in PagerIterator). This can be null (when we start). + private DecoratedKey lastKey; + private int remainingInPartition; - return rows; - } + private boolean exhausted; - private List<Row> filterEmpty(List<Row> result) + protected AbstractQueryPager(ReadCommand command, int protocolVersion) { - for (Row row : result) - { - if (row.cf == null || !row.cf.hasColumns()) - { - List<Row> newResult = new ArrayList<Row>(result.size() - 1); - for (Row row2 : result) - { - if (row2.cf == null || !row2.cf.hasColumns()) - continue; + this.command = command; + this.protocolVersion = protocolVersion; + this.limits = command.limits(); - newResult.add(row2); - } - return newResult; - } - } - return result; + this.remaining = limits.count(); + this.remainingInPartition = limits.perPartitionCount(); } - protected void restoreState(int remaining, boolean shouldFetchExtraRow) + public ReadOrderGroup startOrderGroup() { - this.remaining = remaining; - this.shouldFetchExtraRow = shouldFetchExtraRow; + return command.startOrderGroup(); } - public boolean isExhausted() + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException { - return exhausted || remaining == 0; - } + if (isExhausted()) + return PartitionIterators.EMPTY; - public int maxRemaining() - { - return remaining; + pageSize = Math.min(pageSize, remaining); + return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), limits.forPaging(pageSize), command.nowInSec()); } - public long timestamp() + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { - return timestamp; - } + if (isExhausted()) + return PartitionIterators.EMPTY; - private int nextPageSize(int pageSize) - { - return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0); + pageSize = Math.min(pageSize, remaining); + return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup), limits.forPaging(pageSize), command.nowInSec()); } - public ColumnCounter columnCounter() + private class PagerIterator extends CountingPartitionIterator { - return columnFilter.columnCounter(cfm.comparator, timestamp); - } + private final DataLimits pageLimits; - protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException; + private Row lastRow; - /** - * Checks to see if the first row of a new page contains the last row from the previous page. - * @param first the first row of the new page - * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise - */ - protected abstract boolean containsPreviousLast(Row first); + private boolean isFirstPartition = true; + private RowIterator nextPartition; - /** - * Saves the paging state by recording the last seen partition key and cell name (where applicable). - * @param last the last row in the current page - * @return true if an extra row should be fetched in the next page,false otherwise - */ - protected abstract boolean recordLast(Row last); - - protected abstract boolean isReversed(); - - private List<Row> discardFirst(List<Row> rows) - { - return discardFirst(rows, 1); - } - - @VisibleForTesting - List<Row> discardFirst(List<Row> rows, int toDiscard) - { - if (toDiscard == 0 || rows.isEmpty()) - return rows; - - int i = 0; - DecoratedKey firstKey = null; - ColumnFamily firstCf = null; - while (toDiscard > 0 && i < rows.size()) + private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec) { - Row first = rows.get(i++); - firstKey = first.key; - firstCf = first.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardLast(first.cf, toDiscard, firstCf) - : discardFirst(first.cf, toDiscard, firstCf); + super(iter, pageLimits, nowInSec); + this.pageLimits = pageLimits; } - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the first row that we are sure to keep. On top of that, - // we also keep firstCf is it hasn't been fully emptied by the last iteration above. - int count = firstCf.getColumnCount(); - int newSize = rows.size() - (count == 0 ? i : i - 1); - List<Row> newRows = new ArrayList<Row>(newSize); - if (count != 0) - newRows.add(new Row(firstKey, firstCf)); - newRows.addAll(rows.subList(i, rows.size())); + @Override + @SuppressWarnings("resource") // iter is closed by closing the result or in close() + public boolean hasNext() + { + while (nextPartition == null && super.hasNext()) + { + if (nextPartition == null) + nextPartition = super.next(); - return newRows; - } + DecoratedKey key = nextPartition.partitionKey(); + if (lastKey == null || !lastKey.equals(key)) + remainingInPartition = limits.perPartitionCount(); - private List<Row> discardLast(List<Row> rows) - { - return discardLast(rows, 1); - } + lastKey = key; - @VisibleForTesting - List<Row> discardLast(List<Row> rows, int toDiscard) - { - if (toDiscard == 0 || rows.isEmpty()) - return rows; + // If this is the first partition of this page, this could be the continuation of a partition we've started + // on the previous page. In which case, we could have the problem that the partition has no more "regular" + // rows (but the page size is such we didn't knew before) but it does has a static row. We should then skip + // the partition as returning it would means to the upper layer that the partition has "only" static columns, + // which is not the case (and we know the static results have been sent on the previous page). + if (isFirstPartition && isPreviouslyReturnedPartition(key) && !nextPartition.hasNext()) + { + nextPartition.close(); + nextPartition = null; + } - int i = rows.size()-1; - DecoratedKey lastKey = null; - ColumnFamily lastCf = null; - while (toDiscard > 0 && i >= 0) - { - Row last = rows.get(i--); - lastKey = last.key; - lastCf = last.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardFirst(last.cf, toDiscard, lastCf) - : discardLast(last.cf, toDiscard, lastCf); + isFirstPartition = false; + } + return nextPartition != null; } - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the last row that we are sure to keep. On top of that, - // we also keep lastCf is it hasn't been fully emptied by the last iteration above. - int count = lastCf.getColumnCount(); - int newSize = count == 0 ? i+1 : i+2; - List<Row> newRows = new ArrayList<Row>(newSize); - newRows.addAll(rows.subList(0, i+1)); - if (count != 0) - newRows.add(new Row(lastKey, lastCf)); - - return newRows; - } + @Override + @SuppressWarnings("resource") // iter is closed by closing the result + public RowIterator next() + { + if (!hasNext()) + throw new NoSuchElementException(); - private int getPageLiveCount(List<Row> page) - { - int count = 0; - for (Row row : page) - count += columnCounter().countAll(row.cf).live(); - return count; - } + RowIterator toReturn = nextPartition; + nextPartition = null; - private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester) - : discardHead(toDiscard, newCf, cf.iterator(), tester); - } + return new RowPagerIterator(toReturn); + } - private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester) - : discardTail(cf, toDiscard, newCf, cf.iterator(), tester); - } + @Override + public void close() + { + super.close(); + if (nextPartition != null) + nextPartition.close(); - private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) - { - ColumnCounter counter = columnCounter(); + recordLast(lastKey, lastRow); - List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size()); + int counted = counter.counted(); + remaining -= counted; - remainingInPartition -= counter.countedInCurrentPartition(); ++ // If the clustering of the last row returned is a static one, it means that the partition was only ++ // containing data within the static columns. Therefore, there are not data remaining within the partition. ++ if (lastRow != null && lastRow.clustering() == Clustering.STATIC_CLUSTERING) ++ { ++ remainingInPartition = 0; ++ } ++ else ++ { ++ remainingInPartition -= counter.countedInCurrentPartition(); ++ } + exhausted = counted < pageLimits.count(); + } - // Discard the first 'toDiscard' live, non-static cells - while (iter.hasNext()) + private class RowPagerIterator extends WrappingRowIterator { - Cell c = iter.next(); - - // if it's a static column, don't count it and save it to add to the trimmed results - ColumnDefinition columnDef = cfm.getColumnDefinition(c.name()); - if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC) + RowPagerIterator(RowIterator iter) { - staticCells.add(c); - continue; + super(iter); } - counter.count(c, tester); - - // once we've discarded the required amount, add the rest - if (counter.live() > toDiscard) + @Override ++ public Row staticRow() + { - for (Cell staticCell : staticCells) - copy.addColumn(staticCell); ++ Row staticRow = super.staticRow(); ++ if (!staticRow.isEmpty()) ++ lastRow = staticRow; ++ return staticRow; ++ } + - copy.addColumn(c); - while (iter.hasNext()) - copy.addColumn(iter.next()); ++ @Override + public Row next() + { + lastRow = super.next(); + return lastRow; } } - int live = counter.live(); - // We want to take into account the row even if it was containing only static columns - if (live == 0 && !staticCells.isEmpty()) - live = 1; - return Math.min(live, toDiscard); } - private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) + protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition) { - // Redoing the counting like that is not extremely efficient. - // This is called only for reversed slices or in the case of a race between - // paging and a deletion (pretty unlikely), so this is probably acceptable. - int liveCount = columnCounter().countAll(cf).live(); - - ColumnCounter counter = columnCounter(); - // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard') - while (iter.hasNext()) - { - Cell c = iter.next(); - counter.count(c, tester); - if (counter.live() > liveCount - toDiscard) - break; + this.lastKey = lastKey; + this.remaining = remaining; + this.remainingInPartition = remainingInPartition; + } - copy.addColumn(c); - } - return Math.min(liveCount, toDiscard); + public boolean isExhausted() + { + return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0); } - /** - * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column - * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal - * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we - * need to start from the last non-static cell. - */ - protected Cell firstNonStaticCell(ColumnFamily cf) + public int maxRemaining() { - for (Cell cell : cf) - { - ColumnDefinition def = cfm.getColumnDefinition(cell.name()); - if (def == null || def.kind != ColumnDefinition.Kind.STATIC) - return cell; - } - return null; + return remaining; } - protected static Cell lastCell(ColumnFamily cf) + protected int remainingInPartition() { - return cf.getReverseSortedColumns().iterator().next(); + return remainingInPartition; } + + protected abstract ReadCommand nextPageReadCommand(int pageSize); + protected abstract void recordLast(DecoratedKey key, Row row); + protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 770875a,fd14c82..fd35b29 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@@ -58,65 -66,68 +58,66 @@@ public class RangeSliceQueryPager exten { return lastReturnedKey == null ? null - : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(), maxRemaining()); + : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition()); } - protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery) + protected ReadCommand nextPageReadCommand(int pageSize) throws RequestExecutionException { - SliceQueryFilter sf = (SliceQueryFilter)columnFilter; - AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey); - Composite start = lastReturnedName == null ? sf.start() : lastReturnedName; - PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace, - command.columnFamily, - command.timestamp, - keyRange, - sf, - start, - sf.finish(), - command.rowFilter, - pageSize, - command.countCQL3Rows); - - return localQuery - ? pageCmd.executeLocally() - : StorageProxy.getRangeSlice(pageCmd, consistencyLevel); - } - - protected boolean containsPreviousLast(Row first) - { - if (lastReturnedKey == null || !lastReturnedKey.equals(first.key)) - return false; - - // Same as SliceQueryPager, we ignore a deleted column - Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf); - // If the row was containing only static columns it has already been returned and we can skip it. - if (firstCell == null) - return true; + DataLimits limits; + DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange(); + DataRange pageRange; + if (lastReturnedKey == null) + { + pageRange = fullRange; + limits = command.limits().forPaging(pageSize); + } + else + { + // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother. - boolean includeLastKey = remainingInPartition() > 0; ++ boolean includeLastKey = remainingInPartition() > 0 && lastReturnedRow != null; + AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey); + if (includeLastKey) + { + pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false); + limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); + } + else + { + pageRange = fullRange.forSubRange(bounds); + limits = command.limits().forPaging(pageSize); + } + } - CFMetaData metadata = Schema.instance.getCFMetaData(command.keyspace, command.columnFamily); - return !first.cf.deletionInfo().isDeleted(firstCell) - && firstCell.isLive(timestamp()) - && firstCell.name().isSameCQL3RowAs(metadata.comparator, lastReturnedName); + // it won't hurt for the next page command to query the index manager + // again to check for an applicable index, so don't supply one here + return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange, Optional.empty()); } - protected boolean recordLast(Row last) + protected void recordLast(DecoratedKey key, Row last) { - lastReturnedKey = last.key; - lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name(); - return true; + if (last != null) + { + lastReturnedKey = key; - lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion); ++ if (last.clustering() != Clustering.STATIC_CLUSTERING) ++ lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion); + } } - protected boolean isReversed() + protected boolean isPreviouslyReturnedPartition(DecoratedKey key) { - return ((SliceQueryFilter)command.predicate).reversed; + // Note that lastReturnedKey can be null, but key cannot. + return key.equals(lastReturnedKey); } - private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey) + private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey) { - // We always include lastReturnedKey since we may still be paging within a row, - // and PagedRangeCommand will move over if we're not anyway - AbstractBounds<RowPosition> bounds = command.keyRange; + AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); if (bounds instanceof Range || bounds instanceof Bounds) { - return new Bounds<RowPosition>(lastReturnedKey, bounds.right); + return includeLastKey + ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right) + : new Range<PartitionPosition>(lastReturnedKey, bounds.right); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/1b5e3a9b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index 7057e79,51bbf90..70d4559 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@@ -31,57 -26,8 +31,56 @@@ import org.apache.cassandra.db.filter.* * * For use by MultiPartitionPager. */ -public interface SinglePartitionPager extends QueryPager +public class SinglePartitionPager extends AbstractQueryPager { - public ByteBuffer key(); - public ColumnCounter columnCounter(); + private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class); + + private final SinglePartitionReadCommand<?> command; + + private volatile PagingState.RowMark lastReturned; + + public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state, int protocolVersion) + { + super(command, protocolVersion); + this.command = command; + + if (state != null) + { + lastReturned = state.rowMark; + restoreState(command.partitionKey(), state.remaining, state.remainingInPartition); + } + } + + public ByteBuffer key() + { + return command.partitionKey().getKey(); + } + + public DataLimits limits() + { + return command.limits(); + } + + public PagingState state() + { + return lastReturned == null + ? null + : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition()); + } + + protected ReadCommand nextPageReadCommand(int pageSize) + { + return command.forPaging(lastReturned == null ? null : lastReturned.clustering(command.metadata()), pageSize); + } + + protected void recordLast(DecoratedKey key, Row last) + { - if (last != null) ++ if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING) + lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion); + } + + protected boolean isPreviouslyReturnedPartition(DecoratedKey key) + { - // We're querying a single partition, so if it's not the first page, it is the previously returned one. + return lastReturned != null; + } }
