Fix CQL3 limit patch by slebresne; reviewed by jbellis for CASSANDRA-4877
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a32eb9f7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a32eb9f7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a32eb9f7 Branch: refs/heads/cassandra-1.2 Commit: a32eb9f7d2f2868e8154d178e96e045859e1d855 Parents: e39bf7a Author: Sylvain Lebresne <[email protected]> Authored: Wed Nov 21 14:03:32 2012 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Nov 21 14:03:32 2012 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/cql3/statements/SelectStatement.java | 11 ++--- .../cassandra/db/AbstractColumnContainer.java | 9 +-- .../apache/cassandra/db/CollationController.java | 5 +- .../org/apache/cassandra/db/ColumnFamilyStore.java | 8 ++-- .../org/apache/cassandra/db/RangeSliceCommand.java | 20 ++++---- src/java/org/apache/cassandra/db/Row.java | 11 ++-- .../org/apache/cassandra/db/SliceQueryPager.java | 5 +- .../apache/cassandra/db/filter/ExtendedFilter.java | 38 +++++++------- .../cassandra/db/filter/IDiskAtomFilter.java | 12 +++-- .../cassandra/db/filter/NamesQueryFilter.java | 41 ++++++++++++++- .../cassandra/db/filter/SliceQueryFilter.java | 12 ++++- .../cassandra/db/index/SecondaryIndexManager.java | 4 +- .../cassandra/db/index/SecondaryIndexSearcher.java | 2 +- .../db/index/composites/CompositesSearcher.java | 8 ++-- .../cassandra/db/index/keys/KeysSearcher.java | 4 +- .../cassandra/service/RangeSliceVerbHandler.java | 4 +- .../cassandra/service/RowRepairResolver.java | 6 +- .../org/apache/cassandra/service/StorageProxy.java | 24 +++++---- .../apache/cassandra/db/ColumnFamilyStoreTest.java | 2 +- 20 files changed, 139 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cdd651a..60e4c94 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -18,6 +18,7 @@ * Skip repair on system_trace and keyspaces with RF=1 (CASSANDRA-4956) * Remove select arbitrary limit (CASSANDRA-4918) * Correctly handle prepared operation on collections (CASSANDRA-4945) + * Fix CQL3 LIMIT (CASSANDRA-4877) Merged from 1.1: * add basic authentication support for Pig CassandraStorage (CASSANDRA-3042) * fix CQL2 ALTER TABLE compaction_strategy_class altering (CASSANDRA-4965) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 5963e0e..4ae2b55 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -236,10 +236,7 @@ public class SelectStatement implements CQLStatement IDiskAtomFilter filter = makeFilter(variables); List<IndexExpression> expressions = getIndexExpressions(variables); // The LIMIT provided by the user is the number of CQL row he wants returned. - // For NamesQueryFilter, this is the number of internal rows returned, since a NamesQueryFilter can only select one CQL row in a given internal row. - // For SliceQueryFilter however, we want to have getRangeSlice to count the number of columns, not the number of keys. Then - // SliceQueryFilter.collectReducedColumns will correctly columns having the same composite prefix using ColumnCounter. - boolean maxIsColumns = filter instanceof SliceQueryFilter; + // We want to have getRangeSlice to count the number of columns, not the number of keys. return new RangeSliceCommand(keyspace(), columnFamily(), null, @@ -247,7 +244,7 @@ public class SelectStatement implements CQLStatement getKeyBounds(variables), expressions, getLimit(), - maxIsColumns, + true, false); } @@ -320,7 +317,7 @@ public class SelectStatement implements CQLStatement { SortedSet<ByteBuffer> columnNames = getRequestedColumns(variables); QueryProcessor.validateColumnNames(columnNames); - return new NamesQueryFilter(columnNames); + return new NamesQueryFilter(columnNames, true); } } @@ -813,7 +810,7 @@ public class SelectStatement implements CQLStatement } else { - if (row.cf.getLiveColumnCount() == 0) + if (row.cf.hasOnlyTombstones()) continue; // Static case: One cqlRow for all columns http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/AbstractColumnContainer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java index ab93c54..09d0a38 100644 --- a/src/java/org/apache/cassandra/db/AbstractColumnContainer.java +++ b/src/java/org/apache/cassandra/db/AbstractColumnContainer.java @@ -164,17 +164,14 @@ public abstract class AbstractColumnContainer implements IColumnContainer, IIter return getColumnCount(); } - public int getLiveColumnCount() + public boolean hasOnlyTombstones() { - int count = 0; - for (IColumn column : columns) { if (column.isLive()) - count++; + return false; } - - return count; + return true; } public Iterator<IColumn> iterator() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/CollationController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CollationController.java b/src/java/org/apache/cassandra/db/CollationController.java index 73c675f..7160b62 100644 --- a/src/java/org/apache/cassandra/db/CollationController.java +++ b/src/java/org/apache/cassandra/db/CollationController.java @@ -99,8 +99,9 @@ public class CollationController // avoid changing the filter columns of the original filter // (reduceNameFilter removes columns that are known to be irrelevant) - TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(((NamesQueryFilter) filter.filter).columns); - QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, new NamesQueryFilter(filterColumns)); + NamesQueryFilter namesFilter = (NamesQueryFilter) filter.filter; + TreeSet<ByteBuffer> filterColumns = new TreeSet<ByteBuffer>(namesFilter.columns); + QueryFilter reducedFilter = new QueryFilter(filter.key, filter.path, namesFilter.withUpdatedColumns(filterColumns)); /* add the SSTables on disk */ Collections.sort(view.sstables, SSTable.maxTimestampComparator); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 439ef5f..7e4355a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1454,9 +1454,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return getRangeSlice(superColumn, range, maxResults, columnFilter, rowFilter, false, false); } - public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean maxIsColumns, boolean isPaging) + public List<Row> getRangeSlice(ByteBuffer superColumn, final AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter columnFilter, List<IndexExpression> rowFilter, boolean countCQL3Rows, boolean isPaging) { - return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, maxIsColumns, isPaging)); + return filter(getSequentialIterator(superColumn, range, columnFilter), ExtendedFilter.create(this, columnFilter, rowFilter, maxResults, countCQL3Rows, isPaging)); } public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter) @@ -1464,10 +1464,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return search(clause, range, maxResults, dataFilter, false); } - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows) { Tracing.trace("Executing indexed scan for {}", range.getString(metadata.getKeyValidator())); - return indexManager.search(clause, range, maxResults, dataFilter, maxIsColumns); + return indexManager.search(clause, range, maxResults, dataFilter, countCQL3Rows); } public List<Row> filter(AbstractScanIterator rowIterator, ExtendedFilter filter) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java index b8fdfd6..1748abd 100644 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ b/src/java/org/apache/cassandra/db/RangeSliceCommand.java @@ -83,7 +83,7 @@ public class RangeSliceCommand implements IReadCommand public final AbstractBounds<RowPosition> range; public final int maxResults; - public final boolean maxIsColumns; + public final boolean countCQL3Rows; public final boolean isPaging; public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, int maxResults) @@ -101,7 +101,7 @@ public class RangeSliceCommand implements IReadCommand this(keyspace, column_family, super_column, predicate, range, row_filter, maxResults, false, false); } - public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean maxIsColumns, boolean isPaging) + public RangeSliceCommand(String keyspace, String column_family, ByteBuffer super_column, IDiskAtomFilter predicate, AbstractBounds<RowPosition> range, List<IndexExpression> row_filter, int maxResults, boolean countCQL3Rows, boolean isPaging) { this.keyspace = keyspace; this.column_family = column_family; @@ -110,7 +110,7 @@ public class RangeSliceCommand implements IReadCommand this.range = range; this.row_filter = row_filter; this.maxResults = maxResults; - this.maxIsColumns = maxIsColumns; + this.countCQL3Rows = countCQL3Rows; this.isPaging = isPaging; } @@ -130,7 +130,7 @@ public class RangeSliceCommand implements IReadCommand ", range=" + range + ", row_filter =" + row_filter + ", maxResults=" + maxResults + - ", maxIsColumns=" + maxIsColumns + + ", countCQL3Rows=" + countCQL3Rows + '}'; } @@ -143,7 +143,7 @@ public class RangeSliceCommand implements IReadCommand public IndexScanCommand toIndexScanCommand() { assert row_filter != null && !row_filter.isEmpty(); - if (maxIsColumns || isPaging) + if (countCQL3Rows || isPaging) throw new IllegalStateException("Cannot proceed with range query as the remote end has a version < 1.1. Please update the full cluster first."); CFMetaData cfm = Schema.instance.getCFMetaData(keyspace, column_family); @@ -240,7 +240,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm dos.writeInt(sliceCommand.maxResults); if (version >= MessagingService.VERSION_11) { - dos.writeBoolean(sliceCommand.maxIsColumns); + dos.writeBoolean(sliceCommand.countCQL3Rows); dos.writeBoolean(sliceCommand.isPaging); } } @@ -297,14 +297,14 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm AbstractBounds<RowPosition> range = AbstractBounds.serializer.deserialize(dis, version).toRowBounds(); int maxResults = dis.readInt(); - boolean maxIsColumns = false; + boolean countCQL3Rows = false; boolean isPaging = false; if (version >= MessagingService.VERSION_11) { - maxIsColumns = dis.readBoolean(); + countCQL3Rows = dis.readBoolean(); isPaging = dis.readBoolean(); } - return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, maxIsColumns, isPaging); + return new RangeSliceCommand(keyspace, columnFamily, superColumn, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging); } public long serializedSize(RangeSliceCommand rsc, int version) @@ -380,7 +380,7 @@ class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceComm size += TypeSizes.NATIVE.sizeof(rsc.maxResults); if (version >= MessagingService.VERSION_11) { - size += TypeSizes.NATIVE.sizeof(rsc.maxIsColumns); + size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows); size += TypeSizes.NATIVE.sizeof(rsc.isPaging); } return size; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Row.java b/src/java/org/apache/cassandra/db/Row.java index 0c129e2..74cd906 100644 --- a/src/java/org/apache/cassandra/db/Row.java +++ b/src/java/org/apache/cassandra/db/Row.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db; import java.io.*; +import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.io.IColumnSerializer; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.service.StorageService; @@ -39,11 +40,6 @@ public class Row this.cf = cf; } - public int getLiveColumnCount() - { - return cf == null ? 0 : cf.getLiveColumnCount(); - } - @Override public String toString() { @@ -53,6 +49,11 @@ public class Row ')'; } + public int getLiveCount(IDiskAtomFilter filter) + { + return cf == null ? 0 : filter.getLiveCount(cf); + } + public static class RowSerializer implements IVersionedSerializer<Row> { public void serialize(Row row, DataOutput dos, int version) throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/SliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SliceQueryPager.java b/src/java/org/apache/cassandra/db/SliceQueryPager.java index b67c071..0f45f8c 100644 --- a/src/java/org/apache/cassandra/db/SliceQueryPager.java +++ b/src/java/org/apache/cassandra/db/SliceQueryPager.java @@ -52,9 +52,10 @@ public class SliceQueryPager implements Iterator<ColumnFamily> return null; QueryPath path = new QueryPath(cfs.getColumnFamilyName()); - QueryFilter filter = new QueryFilter(key, path, new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE)); + SliceQueryFilter sliceFilter = new SliceQueryFilter(slices, false, DEFAULT_PAGE_SIZE); + QueryFilter filter = new QueryFilter(key, path, sliceFilter); ColumnFamily cf = cfs.getColumnFamily(filter); - if (cf == null || cf.getLiveColumnCount() < DEFAULT_PAGE_SIZE) + if (cf == null || sliceFilter.getLiveCount(cf) < DEFAULT_PAGE_SIZE) { exhausted = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java index 65c4563..4772c53 100644 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java @@ -44,35 +44,35 @@ public abstract class ExtendedFilter public final ColumnFamilyStore cfs; protected final IDiskAtomFilter originalFilter; private final int maxResults; - private final boolean maxIsColumns; + private final boolean countCQL3Rows; private final boolean isPaging; - public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns, boolean isPaging) + public static ExtendedFilter create(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows, boolean isPaging) { if (clause == null || clause.isEmpty()) { - return new EmptyClauseFilter(cfs, filter, maxResults, maxIsColumns, isPaging); + return new EmptyClauseFilter(cfs, filter, maxResults, countCQL3Rows, isPaging); } else { if (isPaging) throw new IllegalArgumentException("Cross-row paging is not supported along with index clauses"); return cfs.getComparator() instanceof CompositeType - ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, maxIsColumns) - : new FilterWithClauses(cfs, filter, clause, maxResults, maxIsColumns); + ? new FilterWithCompositeClauses(cfs, filter, clause, maxResults, countCQL3Rows) + : new FilterWithClauses(cfs, filter, clause, maxResults, countCQL3Rows); } } - protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging) + protected ExtendedFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging) { assert cfs != null; assert filter != null; this.cfs = cfs; this.originalFilter = filter; this.maxResults = maxResults; - this.maxIsColumns = maxIsColumns; + this.countCQL3Rows = countCQL3Rows; this.isPaging = isPaging; - if (maxIsColumns) + if (countCQL3Rows) originalFilter.updateColumnsLimit(maxResults); if (isPaging && (!(originalFilter instanceof SliceQueryFilter) || ((SliceQueryFilter)originalFilter).finish().remaining() != 0)) throw new IllegalArgumentException("Cross-row paging is only supported for SliceQueryFilter having an empty finish column"); @@ -80,12 +80,12 @@ public abstract class ExtendedFilter public int maxRows() { - return maxIsColumns ? Integer.MAX_VALUE : maxResults; + return countCQL3Rows ? Integer.MAX_VALUE : maxResults; } public int maxColumns() { - return maxIsColumns ? maxResults : Integer.MAX_VALUE; + return countCQL3Rows ? maxResults : Integer.MAX_VALUE; } /** @@ -98,7 +98,7 @@ public abstract class ExtendedFilter if (isPaging) ((SliceQueryFilter)initialFilter()).setStart(ByteBufferUtil.EMPTY_BYTE_BUFFER); - if (!maxIsColumns) + if (!countCQL3Rows) return; int remaining = maxResults - currentColumnsCount; @@ -110,7 +110,7 @@ public abstract class ExtendedFilter if (initialFilter() instanceof SliceQueryFilter) return ((SliceQueryFilter)initialFilter()).lastCounted(); else - return data.getLiveColumnCount(); + return initialFilter().getLiveCount(data); } /** The initial filter we'll do our first slice with (either the original or a superset of it) */ @@ -165,9 +165,9 @@ public abstract class ExtendedFilter protected final List<IndexExpression> clause; protected final IDiskAtomFilter initialFilter; - public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns) + public FilterWithClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows) { - super(cfs, filter, maxResults, maxIsColumns, false); + super(cfs, filter, maxResults, countCQL3Rows, false); assert clause != null; this.clause = clause; this.initialFilter = computeInitialFilter(); @@ -201,7 +201,7 @@ public abstract class ExtendedFilter columns.add(expr.column_name); } columns.addAll(((NamesQueryFilter) originalFilter).columns); - return new NamesQueryFilter(columns); + return ((NamesQueryFilter)originalFilter).withUpdatedColumns(columns); } } return originalFilter; @@ -294,9 +294,9 @@ public abstract class ExtendedFilter private static class FilterWithCompositeClauses extends FilterWithClauses { - public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean maxIsColumns) + public FilterWithCompositeClauses(ColumnFamilyStore cfs, IDiskAtomFilter filter, List<IndexExpression> clause, int maxResults, boolean countCQL3Rows) { - super(cfs, filter, clause, maxResults, maxIsColumns); + super(cfs, filter, clause, maxResults, countCQL3Rows); } /* @@ -318,9 +318,9 @@ public abstract class ExtendedFilter private static class EmptyClauseFilter extends ExtendedFilter { - public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean maxIsColumns, boolean isPaging) + public EmptyClauseFilter(ColumnFamilyStore cfs, IDiskAtomFilter filter, int maxResults, boolean countCQL3Rows, boolean isPaging) { - super(cfs, filter, maxResults, maxIsColumns, isPaging); + super(cfs, filter, maxResults, countCQL3Rows, isPaging); } public IDiskAtomFilter initialFilter() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java index 9805659..f1d9611 100644 --- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java +++ b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java @@ -44,7 +44,7 @@ public interface IDiskAtomFilter * returns an iterator that returns columns from the given memtable * matching the Filter criteria in sorted order. */ - public abstract OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key); + public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key); /** * Get an iterator that returns columns from the given SSTable using the opened file @@ -53,32 +53,34 @@ public interface IDiskAtomFilter * @param file Already opened file data input, saves us opening another one * @param key The key of the row we are about to iterate over */ - public abstract ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry); + public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry); /** * returns an iterator that returns columns from the given SSTable * matching the Filter criteria in sorted order. */ - public abstract ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key); + public ISSTableColumnIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key); /** * collects columns from reducedColumns into returnCF. Termination is determined * by the filter code, which should have some limit on the number of columns * to avoid running out of memory on large rows. */ - public abstract void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore); + public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore); /** * subcolumns of a supercolumn are unindexed, so to pick out parts of those we operate in-memory. * @param superColumn may be modified by filtering op. */ - public abstract SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore); + public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore); public Comparator<IColumn> getColumnComparator(AbstractType<?> comparator); public boolean isReversed(); public void updateColumnsLimit(int newLimit); + public int getLiveCount(ColumnFamily cf); + public static class Serializer implements IVersionedSerializer<IDiskAtomFilter> { public static Serializer instance = new Serializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java index a347926..0581e12 100644 --- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@ -45,9 +46,19 @@ public class NamesQueryFilter implements IDiskAtomFilter public final SortedSet<ByteBuffer> columns; + // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know + // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows. + private final boolean countCQL3Rows; + public NamesQueryFilter(SortedSet<ByteBuffer> columns) { + this(columns, false); + } + + public NamesQueryFilter(SortedSet<ByteBuffer> columns, boolean countCQL3Rows) + { this.columns = columns; + this.countCQL3Rows = countCQL3Rows; } public NamesQueryFilter(ByteBuffer column) @@ -55,6 +66,11 @@ public class NamesQueryFilter implements IDiskAtomFilter this(FBUtilities.singleton(column)); } + public NamesQueryFilter withUpdatedColumns(SortedSet<ByteBuffer> newColumns) + { + return new NamesQueryFilter(newColumns, countCQL3Rows); + } + public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key) { return Memtable.getNamesIterator(key, cf, this); @@ -114,6 +130,20 @@ public class NamesQueryFilter implements IDiskAtomFilter { } + public int getLiveCount(ColumnFamily cf) + { + if (countCQL3Rows) + return cf.hasOnlyTombstones() ? 0 : 1; + + int count = 0; + for (IColumn column : cf) + { + if (column.isLive()) + count++; + } + return count; + } + public static class Serializer implements IVersionedSerializer<NamesQueryFilter> { public void serialize(NamesQueryFilter f, DataOutput dos, int version) throws IOException @@ -123,6 +153,10 @@ public class NamesQueryFilter implements IDiskAtomFilter { ByteBufferUtil.writeWithShortLength(cName, dos); } + // If we talking against an older node, we have no way to tell him that we want to count CQL3 rows. This does mean that + // this node may return less data than required. The workaround being to upgrade all nodes. + if (version >= MessagingService.VERSION_12) + dos.writeBoolean(f.countCQL3Rows); } public NamesQueryFilter deserialize(DataInput dis, int version) throws IOException @@ -136,7 +170,10 @@ public class NamesQueryFilter implements IDiskAtomFilter SortedSet<ByteBuffer> columns = new TreeSet<ByteBuffer>(comparator); for (int i = 0; i < size; ++i) columns.add(ByteBufferUtil.readWithShortLength(dis)); - return new NamesQueryFilter(columns); + boolean countCQL3Rows = version >= MessagingService.VERSION_12 + ? dis.readBoolean() + : false; + return new NamesQueryFilter(columns, countCQL3Rows); } public long serializedSize(NamesQueryFilter f, int version) @@ -148,6 +185,8 @@ public class NamesQueryFilter implements IDiskAtomFilter int cNameSize = cName.remaining(); size += sizes.sizeof((short) cNameSize) + cNameSize; } + if (version >= MessagingService.VERSION_12) + size += sizes.sizeof(f.countCQL3Rows); return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java index 7ef7977..2971151 100644 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java @@ -58,6 +58,11 @@ public class SliceQueryFilter implements IDiskAtomFilter this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count); } + public SliceQueryFilter(ByteBuffer start, ByteBuffer finish, boolean reversed, int count, int compositesToGroup) + { + this(new ColumnSlice[] { new ColumnSlice(start, finish) }, reversed, count, compositesToGroup, 1); + } + /** * Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or * reversed). @@ -81,6 +86,11 @@ public class SliceQueryFilter implements IDiskAtomFilter return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup, countMutliplierForCompatibility); } + public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices) + { + return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup, countMutliplierForCompatibility); + } + public OnDiskAtomIterator getMemtableColumnIterator(ColumnFamily cf, DecoratedKey key) { return Memtable.getSliceIterator(key, cf, this); @@ -242,7 +252,7 @@ public class SliceQueryFilter implements IDiskAtomFilter @Override public String toString() { - return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + "]"; + return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]"; } public boolean isReversed() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 5a3da59..1be04dd 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -534,7 +534,7 @@ public class SecondaryIndexManager * @param dataFilter the column range to restrict to * @return found indexed rows */ - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows) { List<SecondaryIndexSearcher> indexSearchers = getIndexSearchersForQuery(clause); @@ -546,7 +546,7 @@ public class SecondaryIndexManager throw new RuntimeException("Unable to search across multiple secondary index types"); - return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, maxIsColumns); + return indexSearchers.get(0).search(clause, range, maxResults, dataFilter, countCQL3Rows); } public Collection<SecondaryIndex> getIndexesByNames(Set<String> idxNames) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index d49169c..a8c1dde 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -38,7 +38,7 @@ public abstract class SecondaryIndexSearcher this.baseCfs = indexManager.baseCfs; } - public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns); + public abstract List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows); /** * @return true this index is able to handle given clauses. http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index d5a2611..4817a00 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -78,10 +78,10 @@ public class CompositesSearcher extends SecondaryIndexSearcher } @Override - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows) { assert clause != null && !clause.isEmpty(); - ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns, false); + ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false); return baseCfs.filter(getIndexedIterator(range, filter), filter); } @@ -301,7 +301,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher if (!originalFilter.includes(baseComparator, start)) continue; - SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE); + SliceQueryFilter dataFilter = new SliceQueryFilter(start, builder.copy().buildAsEndOfRange(), false, Integer.MAX_VALUE, prefixSize); ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, path, dataFilter)); if (newData != null) { @@ -322,7 +322,7 @@ public class CompositesSearcher extends SecondaryIndexSearcher if (data == null) data = ColumnFamily.create(baseCfs.metadata); data.resolve(newData); - columnsCount += newData.getLiveColumnCount(); + columnsCount += dataFilter.lastCounted(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index a07f773..4be7988 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -75,10 +75,10 @@ public class KeysSearcher extends SecondaryIndexSearcher } @Override - public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean maxIsColumns) + public List<Row> search(List<IndexExpression> clause, AbstractBounds<RowPosition> range, int maxResults, IDiskAtomFilter dataFilter, boolean countCQL3Rows) { assert clause != null && !clause.isEmpty(); - ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, maxIsColumns, false); + ExtendedFilter filter = ExtendedFilter.create(baseCfs, dataFilter, clause, maxResults, countCQL3Rows, false); return baseCfs.filter(getIndexedIterator(range, filter), filter); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java index ef7beaa..ba8283f 100644 --- a/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java +++ b/src/java/org/apache/cassandra/service/RangeSliceVerbHandler.java @@ -41,9 +41,9 @@ public class RangeSliceVerbHandler implements IVerbHandler<RangeSliceCommand> { ColumnFamilyStore cfs = Table.open(command.keyspace).getColumnFamilyStore(command.column_family); if (cfs.indexManager.hasIndexFor(command.row_filter)) - return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.maxIsColumns); + return cfs.search(command.row_filter, command.range, command.maxResults, command.predicate, command.countCQL3Rows); else - return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, command.predicate, command.row_filter, command.maxIsColumns, command.isPaging); + return cfs.getRangeSlice(command.super_column, command.range, command.maxResults, command.predicate, command.row_filter, command.countCQL3Rows, command.isPaging); } public void doVerb(MessageIn<RangeSliceCommand> message, String id) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/RowRepairResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RowRepairResolver.java b/src/java/org/apache/cassandra/service/RowRepairResolver.java index 975b204..21cf5ab 100644 --- a/src/java/org/apache/cassandra/service/RowRepairResolver.java +++ b/src/java/org/apache/cassandra/service/RowRepairResolver.java @@ -44,12 +44,12 @@ public class RowRepairResolver extends AbstractRowResolver { private int maxLiveCount = 0; public List<IAsyncResult> repairResults = Collections.emptyList(); - private final SliceQueryFilter filter; // can be null if names query + private final IDiskAtomFilter filter; public RowRepairResolver(String table, ByteBuffer key, IDiskAtomFilter qFilter) { super(key, table); - this.filter = qFilter instanceof SliceQueryFilter ? (SliceQueryFilter)qFilter : null; + this.filter = qFilter; } /* @@ -80,7 +80,7 @@ public class RowRepairResolver extends AbstractRowResolver endpoints.add(message.from); // compute maxLiveCount to prevent short reads -- see https://issues.apache.org/jira/browse/CASSANDRA-2643 - int liveCount = cf == null ? 0 : (filter == null ? cf.getLiveColumnCount() : filter.getLiveCount(cf)); + int liveCount = cf == null ? 0 : filter.getLiveCount(cf); if (liveCount > maxLiveCount) maxLiveCount = liveCount; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index b747075..0c3eae9 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -41,6 +41,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.Table; +import org.apache.cassandra.db.filter.ColumnSlice; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.db.filter.SliceQueryFilter; @@ -1093,13 +1094,9 @@ public class StorageProxy implements StorageProxyMBean // now scan until we have enough results try { - final IDiskAtomFilter emptyPredicate = new SliceQueryFilter(ByteBufferUtil.EMPTY_BYTE_BUFFER, - ByteBufferUtil.EMPTY_BYTE_BUFFER, - false, - -1); IDiskAtomFilter commandPredicate = command.predicate; - int columnsCount = 0; + int cql3RowCount = 0; rows = new ArrayList<Row>(); List<AbstractBounds<RowPosition>> ranges = getRestrictedRanges(command.range); for (AbstractBounds<RowPosition> range : ranges) @@ -1111,7 +1108,7 @@ public class StorageProxy implements StorageProxyMBean range, command.row_filter, command.maxResults, - command.maxIsColumns, + command.countCQL3Rows, command.isPaging); List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right); @@ -1144,7 +1141,8 @@ public class StorageProxy implements StorageProxyMBean for (Row row : handler.get()) { rows.add(row); - columnsCount += row.getLiveColumnCount(); + if (nodeCmd.countCQL3Rows) + cql3RowCount += row.getLiveCount(commandPredicate); logger.trace("range slices read {}", row.key); } FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout()); @@ -1162,14 +1160,18 @@ public class StorageProxy implements StorageProxyMBean } // if we're done, great, otherwise, move to the next range - int count = nodeCmd.maxIsColumns ? columnsCount : rows.size(); + int count = nodeCmd.countCQL3Rows ? cql3RowCount : rows.size(); if (count >= nodeCmd.maxResults) break; // if we are paging and already got some rows, reset the column filter predicate, // so we start iterating the next row from the first column if (!rows.isEmpty() && command.isPaging) - commandPredicate = emptyPredicate; + { + // We only allow paging with a slice filter (doesn't make sense otherwise anyway) + assert commandPredicate instanceof SliceQueryFilter; + commandPredicate = ((SliceQueryFilter)commandPredicate).withUpdatedSlices(ColumnSlice.ALL_COLUMNS_ARRAY); + } } } finally @@ -1189,8 +1191,8 @@ public class StorageProxy implements StorageProxyMBean private static List<Row> trim(RangeSliceCommand command, List<Row> rows) { - // When maxIsColumns, we let the caller trim the result. - if (command.maxIsColumns) + // When countCQL3Rows, we let the caller trim the result. + if (command.countCQL3Rows) return rows; else return rows.size() > command.maxResults ? rows.subList(0, command.maxResults) : rows; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a32eb9f7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index 625d25f..cd34a91 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -948,7 +948,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader int columns = 0; for (Row row : rows) { - columns += row.getLiveColumnCount(); + columns += row.getLiveCount(new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, expectedCount)); } assert columns == expectedCount : "Expected " + expectedCount + " live columns but got " + columns + ": " + rows; }
