http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index c330eea..2c16ace 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -17,156 +17,133 @@ */ package org.apache.cassandra.service.pager; -import java.util.*; - -import com.google.common.annotations.VisibleForTesting; - import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnCounter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.cassandra.service.ClientState; abstract class AbstractQueryPager implements QueryPager { - private static final Logger logger = LoggerFactory.getLogger(AbstractQueryPager.class); + protected final ReadCommand command; + protected final DataLimits limits; - private final ConsistencyLevel consistencyLevel; - private final boolean localQuery; + private int remaining; - protected final CFMetaData cfm; - protected final IDiskAtomFilter columnFilter; - private final long timestamp; + // This is the last key we've been reading from (or can still be reading within). This the key for + // which remainingInPartition makes sense: if we're starting another key, we should reset remainingInPartition + // (and this is done in PagerIterator). This can be null (when we start). + private DecoratedKey lastKey; + private int remainingInPartition; - private int remaining; private boolean exhausted; - private boolean shouldFetchExtraRow; - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - String keyspace, - String columnFamily, - IDiskAtomFilter columnFilter, - long timestamp) + protected AbstractQueryPager(ReadCommand command) { - this(consistencyLevel, toFetch, localQuery, Schema.instance.getCFMetaData(keyspace, columnFamily), columnFilter, timestamp); + this.command = command; + this.limits = command.limits(); + + this.remaining = limits.count(); + this.remainingInPartition = limits.perPartitionCount(); } - protected AbstractQueryPager(ConsistencyLevel consistencyLevel, - int toFetch, - boolean localQuery, - CFMetaData cfm, - IDiskAtomFilter columnFilter, - long timestamp) + public ReadOrderGroup startOrderGroup() { - this.consistencyLevel = consistencyLevel; - this.localQuery = localQuery; + return command.startOrderGroup(); + } - this.cfm = cfm; - this.columnFilter = columnFilter; - this.timestamp = timestamp; + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + { + if (isExhausted()) + return PartitionIterators.EMPTY; - this.remaining = toFetch; + pageSize = Math.min(pageSize, remaining); + return new PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), limits.forPaging(pageSize), command.nowInSec()); } - - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { if (isExhausted()) - return Collections.emptyList(); + return PartitionIterators.EMPTY; - int currentPageSize = nextPageSize(pageSize); - List<Row> rows = filterEmpty(queryNextPage(currentPageSize, consistencyLevel, localQuery)); + pageSize = Math.min(pageSize, remaining); + return new PagerIterator(nextPageReadCommand(pageSize).executeInternal(orderGroup), limits.forPaging(pageSize), command.nowInSec()); + } - if (rows.isEmpty()) - { - logger.debug("Got empty set of rows, considering pager exhausted"); - exhausted = true; - return Collections.emptyList(); - } + private class PagerIterator extends CountingPartitionIterator + { + private final DataLimits pageLimits; - int liveCount = getPageLiveCount(rows); - logger.debug("Fetched {} live rows", liveCount); + private Row lastRow; - // Because SP.getRangeSlice doesn't trim the result (see SP.trim()), liveCount may be greater than what asked - // (currentPageSize). This would throw off the paging logic so we trim the excess. It's not extremely efficient - // but most of the time there should be nothing or very little to trim. - if (liveCount > currentPageSize) + private PagerIterator(PartitionIterator iter, DataLimits pageLimits, int nowInSec) { - rows = discardLast(rows, liveCount - currentPageSize); - liveCount = currentPageSize; + super(iter, pageLimits, nowInSec); + this.pageLimits = pageLimits; } - remaining -= liveCount; - - // If we've got less than requested, there is no more query to do (but - // we still need to return the current page) - if (liveCount < currentPageSize) + @Override + @SuppressWarnings("resource") // iter is closed by closing the result + public RowIterator next() { - logger.debug("Got result ({}) smaller than page size ({}), considering pager exhausted", liveCount, currentPageSize); - exhausted = true; - } + RowIterator iter = super.next(); + try + { + DecoratedKey key = iter.partitionKey(); + if (lastKey == null || !lastKey.equals(key)) + remainingInPartition = limits.perPartitionCount(); - // If it's not the first query and the first column is the last one returned (likely - // but not certain since paging can race with deletes/expiration), then remove the - // first column. - if (containsPreviousLast(rows.get(0))) - { - rows = discardFirst(rows); - remaining++; - } - // Otherwise, if 'shouldFetchExtraRow' was set, we queried for one more than the page size, - // so if the page is full, trim the last entry - else if (shouldFetchExtraRow && !exhausted) - { - // We've asked for one more than necessary - rows = discardLast(rows); - remaining++; + lastKey = key; + return new RowPagerIterator(iter); + } + catch (RuntimeException e) + { + iter.close(); + throw e; + } } - logger.debug("Remaining rows to page: {}", remaining); - - if (!isExhausted()) - shouldFetchExtraRow = recordLast(rows.get(rows.size() - 1)); + @Override + public void close() + { + super.close(); + recordLast(lastKey, lastRow); - return rows; - } + int counted = counter.counted(); + remaining -= counted; + remainingInPartition -= counter.countedInCurrentPartition(); + exhausted = counted < pageLimits.count(); + } - private List<Row> filterEmpty(List<Row> result) - { - for (Row row : result) + private class RowPagerIterator extends WrappingRowIterator { - if (row.cf == null || !row.cf.hasColumns()) + RowPagerIterator(RowIterator iter) { - List<Row> newResult = new ArrayList<Row>(result.size() - 1); - for (Row row2 : result) - { - if (row2.cf == null || !row2.cf.hasColumns()) - continue; + super(iter); + } - newResult.add(row2); - } - return newResult; + @Override + public Row next() + { + lastRow = super.next(); + return lastRow; } } - return result; } - protected void restoreState(int remaining, boolean shouldFetchExtraRow) + protected void restoreState(DecoratedKey lastKey, int remaining, int remainingInPartition) { + this.lastKey = lastKey; this.remaining = remaining; - this.shouldFetchExtraRow = shouldFetchExtraRow; + this.remainingInPartition = remainingInPartition; } public boolean isExhausted() { - return exhausted || remaining == 0; + return exhausted || remaining == 0 || ((this instanceof SinglePartitionPager) && remainingInPartition == 0); } public int maxRemaining() @@ -174,220 +151,11 @@ abstract class AbstractQueryPager implements QueryPager return remaining; } - public long timestamp() + protected int remainingInPartition() { - return timestamp; - } - - private int nextPageSize(int pageSize) - { - return Math.min(remaining, pageSize) + (shouldFetchExtraRow ? 1 : 0); - } - - public ColumnCounter columnCounter() - { - return columnFilter.columnCounter(cfm.comparator, timestamp); - } - - protected abstract List<Row> queryNextPage(int pageSize, ConsistencyLevel consistency, boolean localQuery) throws RequestValidationException, RequestExecutionException; - - /** - * Checks to see if the first row of a new page contains the last row from the previous page. - * @param first the first row of the new page - * @return true if <code>first</code> contains the last from from the previous page and it is live, false otherwise - */ - protected abstract boolean containsPreviousLast(Row first); - - /** - * Saves the paging state by recording the last seen partition key and cell name (where applicable). - * @param last the last row in the current page - * @return true if an extra row should be fetched in the next page,false otherwise - */ - protected abstract boolean recordLast(Row last); - - protected abstract boolean isReversed(); - - private List<Row> discardFirst(List<Row> rows) - { - return discardFirst(rows, 1); - } - - @VisibleForTesting - List<Row> discardFirst(List<Row> rows, int toDiscard) - { - if (toDiscard == 0 || rows.isEmpty()) - return rows; - - int i = 0; - DecoratedKey firstKey = null; - ColumnFamily firstCf = null; - while (toDiscard > 0 && i < rows.size()) - { - Row first = rows.get(i++); - firstKey = first.key; - firstCf = first.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardLast(first.cf, toDiscard, firstCf) - : discardFirst(first.cf, toDiscard, firstCf); - } - - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the first row that we are sure to keep. On top of that, - // we also keep firstCf is it hasn't been fully emptied by the last iteration above. - int count = firstCf.getColumnCount(); - int newSize = rows.size() - (count == 0 ? i : i - 1); - List<Row> newRows = new ArrayList<Row>(newSize); - if (count != 0) - newRows.add(new Row(firstKey, firstCf)); - newRows.addAll(rows.subList(i, rows.size())); - - return newRows; + return remainingInPartition; } - private List<Row> discardLast(List<Row> rows) - { - return discardLast(rows, 1); - } - - @VisibleForTesting - List<Row> discardLast(List<Row> rows, int toDiscard) - { - if (toDiscard == 0 || rows.isEmpty()) - return rows; - - int i = rows.size()-1; - DecoratedKey lastKey = null; - ColumnFamily lastCf = null; - while (toDiscard > 0 && i >= 0) - { - Row last = rows.get(i--); - lastKey = last.key; - lastCf = last.cf.cloneMeShallow(isReversed()); - toDiscard -= isReversed() - ? discardFirst(last.cf, toDiscard, lastCf) - : discardLast(last.cf, toDiscard, lastCf); - } - - // If there is less live data than to discard, all is discarded - if (toDiscard > 0) - return Collections.<Row>emptyList(); - - // i is the index of the last row that we are sure to keep. On top of that, - // we also keep lastCf is it hasn't been fully emptied by the last iteration above. - int count = lastCf.getColumnCount(); - int newSize = count == 0 ? i+1 : i+2; - List<Row> newRows = new ArrayList<Row>(newSize); - newRows.addAll(rows.subList(0, i+1)); - if (count != 0) - newRows.add(new Row(lastKey, lastCf)); - - return newRows; - } - - private int getPageLiveCount(List<Row> page) - { - int count = 0; - for (Row row : page) - count += columnCounter().countAll(row.cf).live(); - return count; - } - - private int discardFirst(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardTail(cf, toDiscard, newCf, cf.reverseIterator(), tester) - : discardHead(toDiscard, newCf, cf.iterator(), tester); - } - - private int discardLast(ColumnFamily cf, int toDiscard, ColumnFamily newCf) - { - boolean isReversed = isReversed(); - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(isReversed); - return isReversed - ? discardHead(toDiscard, newCf, cf.reverseIterator(), tester) - : discardTail(cf, toDiscard, newCf, cf.iterator(), tester); - } - - private int discardHead(int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) - { - ColumnCounter counter = columnCounter(); - - List<Cell> staticCells = new ArrayList<>(cfm.staticColumns().size()); - - // Discard the first 'toDiscard' live, non-static cells - while (iter.hasNext()) - { - Cell c = iter.next(); - - // if it's a static column, don't count it and save it to add to the trimmed results - ColumnDefinition columnDef = cfm.getColumnDefinition(c.name()); - if (columnDef != null && columnDef.kind == ColumnDefinition.Kind.STATIC) - { - staticCells.add(c); - continue; - } - - counter.count(c, tester); - - // once we've discarded the required amount, add the rest - if (counter.live() > toDiscard) - { - for (Cell staticCell : staticCells) - copy.addColumn(staticCell); - - copy.addColumn(c); - while (iter.hasNext()) - copy.addColumn(iter.next()); - } - } - return Math.min(counter.live(), toDiscard); - } - - private int discardTail(ColumnFamily cf, int toDiscard, ColumnFamily copy, Iterator<Cell> iter, DeletionInfo.InOrderTester tester) - { - // Redoing the counting like that is not extremely efficient. - // This is called only for reversed slices or in the case of a race between - // paging and a deletion (pretty unlikely), so this is probably acceptable. - int liveCount = columnCounter().countAll(cf).live(); - - ColumnCounter counter = columnCounter(); - // Discard the last 'toDiscard' live (so stop adding as sound as we're past 'liveCount - toDiscard') - while (iter.hasNext()) - { - Cell c = iter.next(); - counter.count(c, tester); - if (counter.live() > liveCount - toDiscard) - break; - - copy.addColumn(c); - } - return Math.min(liveCount, toDiscard); - } - - /** - * Returns the first non-static cell in the ColumnFamily. This is necessary to avoid recording a static column - * as the "last" cell seen in a reversed query. Because we will always query static columns alongside the normal - * data for a page, they are not a good indicator of where paging should resume. When we begin the next page, we - * need to start from the last non-static cell. - */ - protected Cell firstNonStaticCell(ColumnFamily cf) - { - for (Cell cell : cf) - { - ColumnDefinition def = cfm.getColumnDefinition(cell.name()); - if (def == null || def.kind != ColumnDefinition.Kind.STATIC) - return cell; - } - return null; - } - - protected static Cell lastCell(ColumnFamily cf) - { - return cf.getReverseSortedColumns().iterator().next(); - } + protected abstract ReadCommand nextPageReadCommand(int pageSize); + protected abstract void recordLast(DecoratedKey key, Row row); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 35d0971..4fb1429 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -17,10 +17,14 @@ */ package org.apache.cassandra.service.pager; -import java.util.ArrayList; import java.util.List; +import com.google.common.collect.AbstractIterator; + import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.ClientState; @@ -39,53 +43,44 @@ import org.apache.cassandra.service.ClientState; * cfs meanRowSize to decide if parallelizing some of the command might be worth it while being confident we don't * blow out memory. */ -class MultiPartitionPager implements QueryPager +public class MultiPartitionPager implements QueryPager { private final SinglePartitionPager[] pagers; - private final long timestamp; + private final DataLimits limit; + + private final int nowInSec; private int remaining; private int current; - MultiPartitionPager(List<ReadCommand> commands, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state, int limitForQuery) + public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state) { + this.limit = group.limits(); + this.nowInSec = group.nowInSec(); + int i = 0; // If it's not the beginning (state != null), we need to find where we were and skip previous commands // since they are done. if (state != null) - for (; i < commands.size(); i++) - if (commands.get(i).key.equals(state.partitionKey)) + for (; i < group.commands.size(); i++) + if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey)) break; - if (i >= commands.size()) + if (i >= group.commands.size()) { pagers = null; - timestamp = -1; return; } - pagers = new SinglePartitionPager[commands.size() - i]; + pagers = new SinglePartitionPager[group.commands.size() - i]; // 'i' is on the first non exhausted pager for the previous page (or the first one) - pagers[0] = makePager(commands.get(i), consistencyLevel, cState, localQuery, state); - timestamp = commands.get(i).timestamp; + pagers[0] = group.commands.get(i).getPager(state); // Following ones haven't been started yet - for (int j = i + 1; j < commands.size(); j++) - { - ReadCommand command = commands.get(j); - if (command.timestamp != timestamp) - throw new IllegalArgumentException("All commands must have the same timestamp or weird results may happen."); - pagers[j - i] = makePager(command, consistencyLevel, cState, localQuery, null); - } - - remaining = state == null ? limitForQuery : state.remaining; - } + for (int j = i + 1; j < group.commands.size(); j++) + pagers[j - i] = group.commands.get(j).getPager(null); - private static SinglePartitionPager makePager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean localQuery, PagingState state) - { - return command instanceof SliceFromReadCommand - ? new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, localQuery, state) - : new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, localQuery); + remaining = state == null ? limit.count() : state.remaining; } public PagingState state() @@ -95,7 +90,7 @@ class MultiPartitionPager implements QueryPager return null; PagingState state = pagers[current].state(); - return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining); + return new PagingState(pagers[current].key(), state == null ? null : state.cellName, remaining, Integer.MAX_VALUE); } public boolean isExhausted() @@ -113,35 +108,92 @@ class MultiPartitionPager implements QueryPager return true; } - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException + public ReadOrderGroup startOrderGroup() { - List<Row> result = new ArrayList<Row>(); - - int remainingThisQuery = Math.min(remaining, pageSize); - while (remainingThisQuery > 0 && !isExhausted()) + // Note that for all pagers, the only difference is the partition key to which it applies, so in practice we + // can use any of the sub-pager ReadOrderGroup group to protect the whole pager + for (int i = current; i < pagers.length; i++) { - // isExhausted has set us on the first non-exhausted pager - List<Row> page = pagers[current].fetchPage(remainingThisQuery); - if (page.isEmpty()) - continue; - - Row row = page.get(0); - int fetched = pagers[current].columnCounter().countAll(row.cf).live(); - remaining -= fetched; - remainingThisQuery -= fetched; - result.add(row); + if (pagers[i] != null) + return pagers[i].startOrderGroup(); } + throw new AssertionError("Shouldn't be called on an exhausted pager"); + } - return result; + @SuppressWarnings("resource") + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + { + int toQuery = Math.min(remaining, pageSize); + PagersIterator iter = new PagersIterator(toQuery, consistency, clientState, null); + CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec); + iter.setCounter(countingIter.counter()); + return countingIter; } - public int maxRemaining() + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException { - return remaining; + int toQuery = Math.min(remaining, pageSize); + PagersIterator iter = new PagersIterator(toQuery, null, null, orderGroup); + CountingPartitionIterator countingIter = new CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec); + iter.setCounter(countingIter.counter()); + return countingIter; + } + + private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final int pageSize; + private PartitionIterator result; + private DataLimits.Counter counter; + + // For "normal" queries + private final ConsistencyLevel consistency; + private final ClientState clientState; + + // For internal queries + private final ReadOrderGroup orderGroup; + + public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadOrderGroup orderGroup) + { + this.pageSize = pageSize; + this.consistency = consistency; + this.clientState = clientState; + this.orderGroup = orderGroup; + } + + public void setCounter(DataLimits.Counter counter) + { + this.counter = counter; + } + + protected RowIterator computeNext() + { + while (result == null || !result.hasNext()) + { + // This sets us on the first non-exhausted pager + if (isExhausted()) + return endOfData(); + + if (result != null) + result.close(); + + int toQuery = pageSize - counter.counted(); + result = consistency == null + ? pagers[current].fetchPageInternal(toQuery, orderGroup) + : pagers[current].fetchPage(toQuery, consistency, clientState); + } + return result.next(); + } + + public void close() + { + remaining -= counter.counted(); + if (result != null) + result.close(); + } } - public long timestamp() + public int maxRemaining() { - return timestamp; + return remaining; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java deleted file mode 100644 index d03e582..0000000 --- a/src/java/org/apache/cassandra/service/pager/NamesQueryPager.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnCounter; -import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.StorageProxy; - -/** - * Pager over a SliceByNamesReadCommand. - */ -public class NamesQueryPager implements SinglePartitionPager -{ - private final SliceByNamesReadCommand command; - private final ConsistencyLevel consistencyLevel; - private final ClientState state; - private final boolean localQuery; - - private volatile boolean queried; - - /** - * For now, we'll only use this in CQL3. In there, as name query can never - * yield more than one CQL3 row, there is no need for paging and so this is straight-forward. - * - * For thrift, we could imagine needing to page, though even then it's very - * unlikely unless the pageSize is very small. - * - * In any case we currently assert in fetchPage if it's a "thrift" query (i.e. a query that - * count every cell individually) and the names filter asks for more than pageSize columns. - */ - // Don't use directly, use QueryPagers method instead - NamesQueryPager(SliceByNamesReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, boolean localQuery) - { - this.command = command; - this.consistencyLevel = consistencyLevel; - this.state = state; - this.localQuery = localQuery; - } - - public ByteBuffer key() - { - return command.key; - } - - public ColumnCounter columnCounter() - { - // We know NamesQueryFilter.columnCounter don't care about his argument - return command.filter.columnCounter(null, command.timestamp); - } - - public PagingState state() - { - return null; - } - - public boolean isExhausted() - { - return queried; - } - - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException - { - assert command.filter.countCQL3Rows() || command.filter.columns.size() <= pageSize; - - if (isExhausted()) - return Collections.<Row>emptyList(); - - queried = true; - return localQuery - ? Collections.singletonList(command.getRow(Keyspace.open(command.ksName))) - : StorageProxy.read(Collections.<ReadCommand>singletonList(command), consistencyLevel, state); - } - - public int maxRemaining() - { - if (queried) - return 0; - - return command.filter.countCQL3Rows() ? 1 : command.filter.columns.size(); - } - - public long timestamp() - { - return command.timestamp; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/Pageable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/Pageable.java b/src/java/org/apache/cassandra/service/pager/Pageable.java deleted file mode 100644 index d4986f7..0000000 --- a/src/java/org/apache/cassandra/service/pager/Pageable.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import java.util.List; - -import org.apache.cassandra.db.ReadCommand; - -/** - * Marker interface for commands that can be paged. - */ -public interface Pageable -{ - public static class ReadCommands implements Pageable - { - public final List<ReadCommand> commands; - - public final int limitForQuery; - - public ReadCommands(List<ReadCommand> commands, int limitForQuery) - { - this.commands = commands; - this.limitForQuery = limitForQuery; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/PagingState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java b/src/java/org/apache/cassandra/service/pager/PagingState.java index f168880..685dc3f 100644 --- a/src/java/org/apache/cassandra/service/pager/PagingState.java +++ b/src/java/org/apache/cassandra/service/pager/PagingState.java @@ -31,12 +31,14 @@ public class PagingState public final ByteBuffer partitionKey; public final ByteBuffer cellName; public final int remaining; + public final int remainingInPartition; - public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining) + public PagingState(ByteBuffer partitionKey, ByteBuffer cellName, int remaining, int remainingInPartition) { this.partitionKey = partitionKey == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : partitionKey; this.cellName = cellName == null ? ByteBufferUtil.EMPTY_BYTE_BUFFER : cellName; this.remaining = remaining; + this.remainingInPartition = remainingInPartition; } public static PagingState deserialize(ByteBuffer bytes) @@ -50,7 +52,12 @@ public class PagingState ByteBuffer pk = ByteBufferUtil.readWithShortLength(in); ByteBuffer cn = ByteBufferUtil.readWithShortLength(in); int remaining = in.readInt(); - return new PagingState(pk, cn, remaining); + // Note that while 'in.available()' is theoretically an estimate of how many bytes are available + // without blocking, we know that since we're reading a ByteBuffer it will be exactly how many + // bytes remain to be read. And the reason we want to condition this is for backward compatility + // as we used to not set this. + int remainingInPartition = in.available() > 0 ? in.readInt() : Integer.MAX_VALUE; + return new PagingState(pk, cn, remaining, remainingInPartition); } catch (IOException e) { @@ -65,6 +72,7 @@ public class PagingState ByteBufferUtil.writeWithShortLength(partitionKey, out); ByteBufferUtil.writeWithShortLength(cellName, out); out.writeInt(remaining); + out.writeInt(remainingInPartition); return out.buffer(); } catch (IOException e) @@ -77,12 +85,16 @@ public class PagingState { return 2 + partitionKey.remaining() + 2 + cellName.remaining() - + 4; + + 8; // remaining & remainingInPartition } @Override public String toString() { - return String.format("PagingState(key=%s, cellname=%s, remaining=%d", ByteBufferUtil.bytesToHex(partitionKey), ByteBufferUtil.bytesToHex(cellName), remaining); + return String.format("PagingState(key=%s, cellname=%s, remaining=%d, remainingInPartition=%d", + ByteBufferUtil.bytesToHex(partitionKey), + ByteBufferUtil.bytesToHex(cellName), + remaining, + remainingInPartition); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java index ab2dad7..a69335d 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java @@ -17,11 +17,13 @@ */ package org.apache.cassandra.service.pager; -import java.util.List; - -import org.apache.cassandra.db.Row; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.ReadOrderGroup; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; /** * Perform a query, paging it by page of a given size. @@ -44,13 +46,69 @@ import org.apache.cassandra.exceptions.RequestValidationException; */ public interface QueryPager { + public static final QueryPager EMPTY = new QueryPager() + { + public ReadOrderGroup startOrderGroup() + { + return ReadOrderGroup.emptyGroup(); + } + + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException + { + return PartitionIterators.EMPTY; + } + + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException + { + return PartitionIterators.EMPTY; + } + + public boolean isExhausted() + { + return true; + } + + public int maxRemaining() + { + return 0; + } + + public PagingState state() + { + return null; + } + }; + /** * Fetches the next page. * * @param pageSize the maximum number of elements to return in the next page. + * @param consistency the consistency level to achieve for the query. + * @param clientState the {@code ClientState} for the query. In practice, this can be null unless + * {@code consistency} is a serial consistency. + * @return the page of result. + */ + public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException; + + /** + * Starts a new read operation. + * <p> + * This must be called before {@link fetchPageInternal} and passed to it to protect the read. + * The returned object <b>must</b> be closed on all path and it is thus strongly advised to + * use it in a try-with-ressource construction. + * + * @return a newly started order group for this {@code QueryPager}. + */ + public ReadOrderGroup startOrderGroup(); + + /** + * Fetches the next page internally (in other, this does a local query). + * + * @param pageSize the maximum number of elements to return in the next page. + * @param orderGroup the {@code ReadOrderGroup} protecting the read. * @return the page of result. */ - public List<Row> fetchPage(int pageSize) throws RequestValidationException, RequestExecutionException; + public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup orderGroup) throws RequestValidationException, RequestExecutionException; /** * Whether or not this pager is exhausted, i.e. whether or not a call to http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/QueryPagers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java index f933ccb..618ca32 100644 --- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java +++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java @@ -17,180 +17,47 @@ */ package org.apache.cassandra.service.pager; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; - -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnCounter; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.service.ClientState; /** - * Static utility methods to create query pagers. + * Static utility methods for paging. */ public class QueryPagers { private QueryPagers() {}; - private static int maxQueried(ReadCommand command) - { - if (command instanceof SliceByNamesReadCommand) - { - NamesQueryFilter filter = ((SliceByNamesReadCommand)command).filter; - return filter.countCQL3Rows() ? 1 : filter.columns.size(); - } - else - { - SliceQueryFilter filter = ((SliceFromReadCommand)command).filter; - return filter.count; - } - } - - public static boolean mayNeedPaging(Pageable command, int pageSize) - { - if (command instanceof Pageable.ReadCommands) - { - List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands; - - // Using long on purpose, as we could overflow otherwise - long maxQueried = 0; - for (ReadCommand readCmd : commands) - maxQueried += maxQueried(readCmd); - - return maxQueried > pageSize; - } - else if (command instanceof ReadCommand) - { - return maxQueried((ReadCommand)command) > pageSize; - } - else - { - assert command instanceof RangeSliceCommand; - RangeSliceCommand rsc = (RangeSliceCommand)command; - // We don't support paging for thrift in general because the way thrift RangeSliceCommand count rows - // independently of cells makes things harder (see RangeSliceQueryPager). The one case where we do - // get a RangeSliceCommand from CQL3 without the countCQL3Rows flag set is for DISTINCT. In that case - // however, the underlying sliceQueryFilter count is 1, so that the RSC limit is still a limit on the - // number of CQL3 rows returned. - assert rsc.countCQL3Rows || (rsc.predicate instanceof SliceQueryFilter && ((SliceQueryFilter)rsc.predicate).count == 1); - return rsc.maxResults > pageSize; - } - } - - private static QueryPager pager(ReadCommand command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state) - { - if (command instanceof SliceByNamesReadCommand) - return new NamesQueryPager((SliceByNamesReadCommand)command, consistencyLevel, cState, local); - else - return new SliceQueryPager((SliceFromReadCommand)command, consistencyLevel, cState, local, state); - } - - private static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, boolean local, PagingState state) - { - if (command instanceof Pageable.ReadCommands) - { - List<ReadCommand> commands = ((Pageable.ReadCommands)command).commands; - if (commands.size() == 1) - return pager(commands.get(0), consistencyLevel, cState, local, state); - - return new MultiPartitionPager(commands, consistencyLevel, cState, local, state, ((Pageable.ReadCommands) command).limitForQuery); - } - else if (command instanceof ReadCommand) - { - return pager((ReadCommand)command, consistencyLevel, cState, local, state); - } - else - { - assert command instanceof RangeSliceCommand; - RangeSliceCommand rangeCommand = (RangeSliceCommand)command; - if (rangeCommand.predicate instanceof NamesQueryFilter) - return new RangeNamesQueryPager(rangeCommand, consistencyLevel, local, state); - else - return new RangeSliceQueryPager(rangeCommand, consistencyLevel, local, state); - } - } - - public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState) - { - return pager(command, consistencyLevel, cState, false, null); - } - - public static QueryPager pager(Pageable command, ConsistencyLevel consistencyLevel, ClientState cState, PagingState state) - { - return pager(command, consistencyLevel, cState, false, state); - } - - public static QueryPager localPager(Pageable command) - { - return pager(command, null, null, true, null); - } - - /** - * Convenience method to (locally) page an internal row. - * Used to 2ndary index a wide row without dying. - */ - public static Iterator<ColumnFamily> pageRowLocally(final ColumnFamilyStore cfs, ByteBuffer key, final int pageSize) - { - SliceFromReadCommand command = new SliceFromReadCommand(cfs.metadata.ksName, key, cfs.name, System.currentTimeMillis(), new IdentityQueryFilter()); - final SliceQueryPager pager = new SliceQueryPager(command, null, null, true); - - return new Iterator<ColumnFamily>() - { - // We don't use AbstractIterator because we don't want hasNext() to do an actual query - public boolean hasNext() - { - return !pager.isExhausted(); - } - - public ColumnFamily next() - { - try - { - List<Row> rows = pager.fetchPage(pageSize); - ColumnFamily cf = rows.isEmpty() ? null : rows.get(0).cf; - return cf == null ? ArrayBackedSortedColumns.factory.create(cfs.metadata) : cf; - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - /** * Convenience method that count (live) cells/rows for a given slice of a row, but page underneath. */ - public static int countPaged(String keyspace, - String columnFamily, - ByteBuffer key, - SliceQueryFilter filter, + public static int countPaged(CFMetaData metadata, + DecoratedKey key, + ColumnFilter columnFilter, + ClusteringIndexFilter filter, + DataLimits limits, ConsistencyLevel consistencyLevel, - ClientState cState, + ClientState state, final int pageSize, - long now) throws RequestValidationException, RequestExecutionException + int nowInSec, + boolean isForThrift) throws RequestValidationException, RequestExecutionException { - SliceFromReadCommand command = new SliceFromReadCommand(keyspace, key, columnFamily, now, filter); - final SliceQueryPager pager = new SliceQueryPager(command, consistencyLevel, cState, false); + SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter); + final SinglePartitionPager pager = new SinglePartitionPager(command, null); - ColumnCounter counter = filter.columnCounter(Schema.instance.getCFMetaData(keyspace, columnFamily).comparator, now); + int count = 0; while (!pager.isExhausted()) { - List<Row> next = pager.fetchPage(pageSize); - if (!next.isEmpty()) - counter.countAll(next.get(0).cf); + try (CountingPartitionIterator iter = new CountingPartitionIterator(pager.fetchPage(pageSize, consistencyLevel, state), limits, nowInSec)) + { + PartitionIterators.consume(iter); + count += iter.counter().counted(); + } } - return counter.live(); + return count; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java index 50d1280..fffb4e1 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeNamesQueryPager.java @@ -17,13 +17,11 @@ */ package org.apache.cassandra.service.pager; -import java.util.List; - import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.NamesQueryFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; /** @@ -37,25 +35,17 @@ import org.apache.cassandra.service.StorageService; */ public class RangeNamesQueryPager extends AbstractQueryPager { - private final RangeSliceCommand command; private volatile DecoratedKey lastReturnedKey; - // Don't use directly, use QueryPagers method instead - RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery) + public RangeNamesQueryPager(PartitionRangeReadCommand command, PagingState state) { - super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp); - this.command = command; - assert columnFilter instanceof NamesQueryFilter && ((NamesQueryFilter)columnFilter).countCQL3Rows(); - } - - RangeNamesQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state) - { - this(command, consistencyLevel, localQuery); + super(command); + assert command.isNamesQuery(); if (state != null) { lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey); - restoreState(state.remaining, true); + restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); } } @@ -63,51 +53,36 @@ public class RangeNamesQueryPager extends AbstractQueryPager { return lastReturnedKey == null ? null - : new PagingState(lastReturnedKey.getKey(), null, maxRemaining()); + : new PagingState(lastReturnedKey.getKey(), null, maxRemaining(), remainingInPartition()); } - protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery) + protected ReadCommand nextPageReadCommand(int pageSize) throws RequestExecutionException { - AbstractRangeCommand pageCmd = command.withUpdatedLimit(pageSize); + PartitionRangeReadCommand pageCmd = ((PartitionRangeReadCommand)command).withUpdatedLimit(command.limits().forPaging(pageSize)); if (lastReturnedKey != null) pageCmd = pageCmd.forSubRange(makeExcludingKeyBounds(lastReturnedKey)); - return localQuery - ? pageCmd.executeLocally() - : StorageProxy.getRangeSlice(pageCmd, consistencyLevel); - } - - protected boolean containsPreviousLast(Row first) - { - // When querying the next page, we create a bound that exclude the lastReturnedKey - return false; - } - - protected boolean recordLast(Row last) - { - lastReturnedKey = last.key; - // We return false as that means "can that last be in the next query?" - return false; + return pageCmd; } - protected boolean isReversed() + protected void recordLast(DecoratedKey key, Row last) { - return false; + lastReturnedKey = key; } - private AbstractBounds<RowPosition> makeExcludingKeyBounds(RowPosition lastReturnedKey) + private AbstractBounds<PartitionPosition> makeExcludingKeyBounds(PartitionPosition lastReturnedKey) { // We return a range that always exclude lastReturnedKey, since we've already // returned it. - AbstractBounds<RowPosition> bounds = command.keyRange; + AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); if (bounds instanceof Range || bounds instanceof Bounds) { - return new Range<RowPosition>(lastReturnedKey, bounds.right); + return new Range<PartitionPosition>(lastReturnedKey, bounds.right); } else { - return new ExcludingBounds<RowPosition>(lastReturnedKey, bounds.right); + return new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index c9a28e8..6429be0 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -17,17 +17,16 @@ */ package org.apache.cassandra.service.pager; -import java.util.List; - import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.SliceQueryFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Pages a RangeSliceCommand whose predicate is a slice query. * @@ -36,27 +35,21 @@ import org.apache.cassandra.service.StorageService; */ public class RangeSliceQueryPager extends AbstractQueryPager { - private final RangeSliceCommand command; - private volatile DecoratedKey lastReturnedKey; - private volatile CellName lastReturnedName; + private static final Logger logger = LoggerFactory.getLogger(RangeSliceQueryPager.class); - // Don't use directly, use QueryPagers method instead - RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery) - { - super(consistencyLevel, command.maxResults, localQuery, command.keyspace, command.columnFamily, command.predicate, command.timestamp); - this.command = command; - assert columnFilter instanceof SliceQueryFilter; - } + private volatile DecoratedKey lastReturnedKey; + private volatile Clustering lastReturnedClustering; - RangeSliceQueryPager(RangeSliceCommand command, ConsistencyLevel consistencyLevel, boolean localQuery, PagingState state) + public RangeSliceQueryPager(PartitionRangeReadCommand command, PagingState state) { - this(command, consistencyLevel, localQuery); + super(command); + assert !command.isNamesQuery(); if (state != null) { lastReturnedKey = StorageService.getPartitioner().decorateKey(state.partitionKey); - lastReturnedName = cfm.comparator.cellFromByteBuffer(state.cellName); - restoreState(state.remaining, true); + lastReturnedClustering = LegacyLayout.decodeClustering(command.metadata(), state.cellName); + restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); } } @@ -64,67 +57,63 @@ public class RangeSliceQueryPager extends AbstractQueryPager { return lastReturnedKey == null ? null - : new PagingState(lastReturnedKey.getKey(), lastReturnedName.toByteBuffer(), maxRemaining()); + : new PagingState(lastReturnedKey.getKey(), LegacyLayout.encodeClustering(command.metadata(), lastReturnedClustering), maxRemaining(), remainingInPartition()); } - protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery) + protected ReadCommand nextPageReadCommand(int pageSize) throws RequestExecutionException { - SliceQueryFilter sf = (SliceQueryFilter)columnFilter; - AbstractBounds<RowPosition> keyRange = lastReturnedKey == null ? command.keyRange : makeIncludingKeyBounds(lastReturnedKey); - Composite start = lastReturnedName == null ? sf.start() : lastReturnedName; - PagedRangeCommand pageCmd = new PagedRangeCommand(command.keyspace, - command.columnFamily, - command.timestamp, - keyRange, - sf, - start, - sf.finish(), - command.rowFilter, - pageSize, - command.countCQL3Rows); - - return localQuery - ? pageCmd.executeLocally() - : StorageProxy.getRangeSlice(pageCmd, consistencyLevel); - } - - protected boolean containsPreviousLast(Row first) - { - if (lastReturnedKey == null || !lastReturnedKey.equals(first.key)) - return false; - - // Same as SliceQueryPager, we ignore a deleted column - Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf); - return !first.cf.deletionInfo().isDeleted(firstCell) - && firstCell.isLive(timestamp()) - && lastReturnedName.equals(firstCell.name()); - } + DataLimits limits; + DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange(); + DataRange pageRange; + if (lastReturnedKey == null) + { + pageRange = fullRange; + limits = command.limits().forPaging(pageSize); + } + else + { + // We want to include the last returned key only if we haven't achieved our per-partition limit, otherwise, don't bother. + boolean includeLastKey = remainingInPartition() > 0; + AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey); + if (includeLastKey) + { + pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedClustering, false); + limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); + } + else + { + pageRange = fullRange.forSubRange(bounds); + limits = command.limits().forPaging(pageSize); + } + } - protected boolean recordLast(Row last) - { - lastReturnedKey = last.key; - lastReturnedName = (isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf)).name(); - return true; + return new PartitionRangeReadCommand(command.metadata(), command.nowInSec(), command.columnFilter(), command.rowFilter(), limits, pageRange); } - protected boolean isReversed() + protected void recordLast(DecoratedKey key, Row last) { - return ((SliceQueryFilter)command.predicate).reversed; + if (last != null) + { + lastReturnedKey = key; + lastReturnedClustering = last.clustering().takeAlias(); + } } - private AbstractBounds<RowPosition> makeIncludingKeyBounds(RowPosition lastReturnedKey) + private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey) { - // We always include lastReturnedKey since we may still be paging within a row, - // and PagedRangeCommand will move over if we're not anyway - AbstractBounds<RowPosition> bounds = command.keyRange; + AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); if (bounds instanceof Range || bounds instanceof Bounds) { - return new Bounds<RowPosition>(lastReturnedKey, bounds.right); + return includeLastKey + ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right) + : new Range<PartitionPosition>(lastReturnedKey, bounds.right); } else { - return new IncludingExcludingBounds<RowPosition>(lastReturnedKey, bounds.right); + return includeLastKey + ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right) + : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index 51bbf90..6488641 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -19,15 +19,67 @@ package org.apache.cassandra.service.pager; import java.nio.ByteBuffer; -import org.apache.cassandra.db.filter.ColumnCounter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.service.ClientState; /** * Common interface to single partition queries (by slice and by name). * * For use by MultiPartitionPager. */ -public interface SinglePartitionPager extends QueryPager +public class SinglePartitionPager extends AbstractQueryPager { - public ByteBuffer key(); - public ColumnCounter columnCounter(); + private static final Logger logger = LoggerFactory.getLogger(SinglePartitionPager.class); + + private final SinglePartitionReadCommand<?> command; + + private volatile Clustering lastReturned; + + public SinglePartitionPager(SinglePartitionReadCommand<?> command, PagingState state) + { + super(command); + this.command = command; + + if (state != null) + { + lastReturned = LegacyLayout.decodeClustering(command.metadata(), state.cellName); + restoreState(command.partitionKey(), state.remaining, state.remainingInPartition); + } + } + + public ByteBuffer key() + { + return command.partitionKey().getKey(); + } + + public DataLimits limits() + { + return command.limits(); + } + + public PagingState state() + { + return lastReturned == null + ? null + : new PagingState(null, LegacyLayout.encodeClustering(command.metadata(), lastReturned), maxRemaining(), remainingInPartition()); + } + + protected ReadCommand nextPageReadCommand(int pageSize) + { + return command.forPaging(lastReturned, pageSize); + } + + protected void recordLast(DecoratedKey key, Row last) + { + if (last != null) + lastReturned = last.clustering().takeAlias(); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java deleted file mode 100644 index bc364aa..0000000 --- a/src/java/org/apache/cassandra/service/pager/SliceQueryPager.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.service.pager; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.List; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.exceptions.RequestValidationException; -import org.apache.cassandra.exceptions.RequestExecutionException; -import org.apache.cassandra.service.ClientState; -import org.apache.cassandra.service.StorageProxy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Pager over a SliceFromReadCommand. - */ -public class SliceQueryPager extends AbstractQueryPager implements SinglePartitionPager -{ - private static final Logger logger = LoggerFactory.getLogger(SliceQueryPager.class); - - private final SliceFromReadCommand command; - private final ClientState cstate; - - private volatile Composite lastReturned; - - // Don't use directly, use QueryPagers method instead - SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery) - { - super(consistencyLevel, command.filter.count, localQuery, command.ksName, command.cfName, command.filter, command.timestamp); - this.command = command; - this.cstate = cstate; - } - - SliceQueryPager(SliceFromReadCommand command, ConsistencyLevel consistencyLevel, ClientState cstate, boolean localQuery, PagingState state) - { - this(command, consistencyLevel, cstate, localQuery); - - if (state != null) - { - lastReturned = cfm.comparator.fromByteBuffer(state.cellName); - restoreState(state.remaining, true); - } - } - - public ByteBuffer key() - { - return command.key; - } - - public PagingState state() - { - return lastReturned == null - ? null - : new PagingState(null, lastReturned.toByteBuffer(), maxRemaining()); - } - - protected List<Row> queryNextPage(int pageSize, ConsistencyLevel consistencyLevel, boolean localQuery) - throws RequestValidationException, RequestExecutionException - { - // For some queries, such as a DISTINCT query on static columns, the limit for slice queries will be lower - // than the page size (in the static example, it will be 1). We use the min here to ensure we don't fetch - // more rows than we're supposed to. See CASSANDRA-8108 for more details. - SliceQueryFilter filter = command.filter.withUpdatedCount(Math.min(command.filter.count, pageSize)); - if (lastReturned != null) - filter = filter.withUpdatedStart(lastReturned, cfm); - - logger.debug("Querying next page of slice query; new filter: {}", filter); - ReadCommand pageCmd = command.withUpdatedFilter(filter); - return localQuery - ? Collections.singletonList(pageCmd.getRow(Keyspace.open(command.ksName))) - : StorageProxy.read(Collections.singletonList(pageCmd), consistencyLevel, cstate); - } - - protected boolean containsPreviousLast(Row first) - { - if (lastReturned == null) - return false; - - Cell firstCell = isReversed() ? lastCell(first.cf) : firstNonStaticCell(first.cf); - // Note: we only return true if the column is the lastReturned *and* it is live. If it is deleted, it is ignored by the - // rest of the paging code (it hasn't been counted as live in particular) and we want to act as if it wasn't there. - return !first.cf.deletionInfo().isDeleted(firstCell) - && firstCell.isLive(timestamp()) - && lastReturned.equals(firstCell.name()); - } - - protected boolean recordLast(Row last) - { - Cell lastCell = isReversed() ? firstNonStaticCell(last.cf) : lastCell(last.cf); - lastReturned = lastCell.name(); - return true; - } - - protected boolean isReversed() - { - return command.filter.reversed; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 45d04f9..6077166 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -24,14 +24,18 @@ package org.apache.cassandra.service.paxos; import java.io.DataInput; import java.io.IOException; import java.util.UUID; -import java.nio.ByteBuffer; import com.google.common.base.Objects; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.UUIDSerializer; @@ -40,34 +44,31 @@ public class Commit { public static final CommitSerializer serializer = new CommitSerializer(); - public final ByteBuffer key; public final UUID ballot; - public final ColumnFamily update; + public final PartitionUpdate update; - public Commit(ByteBuffer key, UUID ballot, ColumnFamily update) + public Commit(UUID ballot, PartitionUpdate update) { - assert key != null; assert ballot != null; assert update != null; - this.key = key; this.ballot = ballot; this.update = update; } - public static Commit newPrepare(ByteBuffer key, CFMetaData metadata, UUID ballot) + public static Commit newPrepare(DecoratedKey key, CFMetaData metadata, UUID ballot) { - return new Commit(key, ballot, ArrayBackedSortedColumns.factory.create(metadata)); + return new Commit(ballot, PartitionUpdate.emptyUpdate(metadata, key)); } - public static Commit newProposal(ByteBuffer key, UUID ballot, ColumnFamily update) + public static Commit newProposal(UUID ballot, PartitionUpdate update) { - return new Commit(key, ballot, updatesWithPaxosTime(update, ballot)); + return new Commit(ballot, updatesWithPaxosTime(update, ballot)); } - public static Commit emptyCommit(ByteBuffer key, CFMetaData metadata) + public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata) { - return new Commit(key, UUIDGen.minTimeUUID(0), ArrayBackedSortedColumns.factory.create(metadata)); + return new Commit(UUIDGen.minTimeUUID(0), PartitionUpdate.emptyUpdate(metadata, key)); } public boolean isAfter(Commit other) @@ -83,7 +84,7 @@ public class Commit public Mutation makeMutation() { assert update != null; - return new Mutation(key, update); + return new Mutation(update); } @Override @@ -95,7 +96,6 @@ public class Commit Commit commit = (Commit) o; if (!ballot.equals(commit.ballot)) return false; - if (!key.equals(commit.key)) return false; if (!update.equals(commit.update)) return false; return true; @@ -104,52 +104,88 @@ public class Commit @Override public int hashCode() { - return Objects.hashCode(key, ballot, update); + return Objects.hashCode(ballot, update); } - private static ColumnFamily updatesWithPaxosTime(ColumnFamily updates, UUID ballot) + private static PartitionUpdate updatesWithPaxosTime(PartitionUpdate update, UUID ballot) { - ColumnFamily cf = updates.cloneMeShallow(); long t = UUIDGen.microsTimestamp(ballot); - // For the tombstones, we use t-1 so that when insert a collection literall, the range tombstone that deletes the previous values of - // the collection and we want that to have a lower timestamp and our new values. Since tombstones wins over normal insert, using t-1 - // should not be a problem in general (see #6069). - cf.deletionInfo().updateAllTimestamp(t-1); - for (Cell cell : updates) - cf.addAtom(cell.withUpdatedTimestamp(t)); - return cf; + // Using t-1 for tombstones so deletion doesn't trump newly inserted data (#6069) + PartitionUpdate newUpdate = new PartitionUpdate(update.metadata(), + update.partitionKey(), + update.deletionInfo().updateAllTimestamp(t-1), + update.columns(), + update.rowCount()); + + if (!update.staticRow().isEmpty()) + copyWithUpdatedTimestamp(update.staticRow(), newUpdate.staticWriter(), t); + + for (Row row : update) + copyWithUpdatedTimestamp(row, newUpdate.writer(), t); + + return newUpdate; + } + + private static void copyWithUpdatedTimestamp(Row row, Row.Writer writer, long timestamp) + { + Rows.writeClustering(row.clustering(), writer); + writer.writePartitionKeyLivenessInfo(row.primaryKeyLivenessInfo().withUpdatedTimestamp(timestamp)); + writer.writeRowDeletion(row.deletion()); + + for (Cell cell : row) + writer.writeCell(cell.column(), cell.isCounterCell(), cell.value(), cell.livenessInfo().withUpdatedTimestamp(timestamp), cell.path()); + + for (int i = 0; i < row.columns().complexColumnCount(); i++) + { + ColumnDefinition c = row.columns().getComplex(i); + DeletionTime dt = row.getDeletion(c); + // We use t-1 to make sure that on inserting a collection literal, the deletion that comes with it does not + // end up deleting the inserted data (see #6069) + if (!dt.isLive()) + writer.writeComplexDeletion(c, new SimpleDeletionTime(timestamp-1, dt.localDeletionTime())); + } + writer.endOfRow(); } @Override public String toString() { - return String.format("Commit(%s, %s, %s)", ByteBufferUtil.bytesToHex(key), ballot, update); + return String.format("Commit(%s, %s)", ballot, update); } public static class CommitSerializer implements IVersionedSerializer<Commit> { public void serialize(Commit commit, DataOutputPlus out, int version) throws IOException { - ByteBufferUtil.writeWithShortLength(commit.key, out); + if (version < MessagingService.VERSION_30) + ByteBufferUtil.writeWithShortLength(commit.update.partitionKey().getKey(), out); + UUIDSerializer.serializer.serialize(commit.ballot, out, version); - ColumnFamily.serializer.serialize(commit.update, out, version); + PartitionUpdate.serializer.serialize(commit.update, out, version); } public Commit deserialize(DataInput in, int version) throws IOException { - return new Commit(ByteBufferUtil.readWithShortLength(in), - UUIDSerializer.serializer.deserialize(in, version), - ColumnFamily.serializer.deserialize(in, - ArrayBackedSortedColumns.factory, - ColumnSerializer.Flag.LOCAL, - version)); + DecoratedKey key = null; + if (version < MessagingService.VERSION_30) + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); + + UUID ballot = UUIDSerializer.serializer.deserialize(in, version); + PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL, key); + return new Commit(ballot, update); } public long serializedSize(Commit commit, int version) { - return 2 + commit.key.remaining() - + UUIDSerializer.serializer.serializedSize(commit.ballot, version) - + ColumnFamily.serializer.serializedSize(commit.update, version); + TypeSizes sizes = TypeSizes.NATIVE; + + int size = 0; + if (version < MessagingService.VERSION_30) + size += ByteBufferUtil.serializedSizeWithShortLength(commit.update.partitionKey().getKey(), sizes); + + return size + + UUIDSerializer.serializer.serializedSize(commit.ballot, version) + + PartitionUpdate.serializer.serializedSize(commit.update, version, sizes); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PaxosState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java index 01e03f4..20ccb90 100644 --- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java +++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java @@ -39,14 +39,14 @@ public class PaxosState private final Commit accepted; private final Commit mostRecentCommit; - public PaxosState(ByteBuffer key, CFMetaData metadata) + public PaxosState(DecoratedKey key, CFMetaData metadata) { this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata)); } public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit) { - assert promised.key == accepted.key && accepted.key == mostRecentCommit.key; + assert promised.update.partitionKey().equals(accepted.update.partitionKey()) && accepted.update.partitionKey().equals(mostRecentCommit.update.partitionKey()); assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata(); this.promised = promised; @@ -59,11 +59,11 @@ public class PaxosState long start = System.nanoTime(); try { - Lock lock = LOCKS.get(toPrepare.key); + Lock lock = LOCKS.get(toPrepare.update.partitionKey()); lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata()); + PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata()); if (toPrepare.isAfter(state.promised)) { Tracing.trace("Promising ballot {}", toPrepare.ballot); @@ -94,11 +94,11 @@ public class PaxosState long start = System.nanoTime(); try { - Lock lock = LOCKS.get(proposal.key); + Lock lock = LOCKS.get(proposal.update.partitionKey()); lock.lock(); try { - PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata()); + PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata()); if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised)) { Tracing.trace("Accepting proposal {}", proposal); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java index a446b0b..7b5edf2 100644 --- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java +++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +47,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse> private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>(); - public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets, ConsistencyLevel consistency) + public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency) { super(targets, consistency); // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
