http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java b/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java deleted file mode 100644 index a541d5e..0000000 --- a/src/java/org/apache/cassandra/db/filter/IDiskAtomFilter.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Comparator; -import java.util.Iterator; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.CType; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileDataInput; - -/** - * Given an implementation-specific description of what columns to look for, provides methods - * to extract the desired columns from a Memtable, SSTable, or SuperColumn. Either the get*ColumnIterator - * methods will be called, or filterSuperColumn, but not both on the same object. QueryFilter - * takes care of putting the two together if subcolumn filtering needs to be done, based on the - * querypath that it knows (but that IFilter implementations are oblivious to). - */ -public interface IDiskAtomFilter -{ - /** - * returns an iterator that returns columns from the given columnFamily - * matching the Filter criteria in sorted order. - */ - public Iterator<Cell> getColumnIterator(ColumnFamily cf); - - public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf); - - /** - * Get an iterator that returns columns from the given SSTable using the opened file - * matching the Filter criteria in sorted order. - * @param sstable - * @param file Already opened file data input, saves us opening another one - * @param key The key of the row we are about to iterate over - */ - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry); - - /** - * returns an iterator that returns columns from the given SSTable - * matching the Filter criteria in sorted order. - */ - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key); - - /** - * collects columns from reducedColumns into returnCF. Termination is determined - * by the filter code, which should have some limit on the number of columns - * to avoid running out of memory on large rows. - */ - public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now); - - public Comparator<Cell> getColumnComparator(CellNameType comparator); - - public boolean isReversed(); - public void updateColumnsLimit(int newLimit); - - public int getLiveCount(ColumnFamily cf, long now); - public ColumnCounter columnCounter(CellNameType comparator, long now); - - public IDiskAtomFilter cloneShallow(); - public boolean maySelectPrefix(CType type, Composite prefix); - - public boolean shouldInclude(SSTableReader sstable); - - public boolean countCQL3Rows(CellNameType comparator); - - public boolean isHeadFilter(); - - /** - * Whether the provided cf, that is assumed to contain the head of the - * partition, contains enough data to cover this filter. - */ - public boolean isFullyCoveredBy(ColumnFamily cf, long now); - - public static class Serializer implements IVersionedSerializer<IDiskAtomFilter> - { - private final CellNameType type; - - public Serializer(CellNameType type) - { - this.type = type; - } - - public void serialize(IDiskAtomFilter filter, DataOutputPlus out, int version) throws IOException - { - if (filter instanceof SliceQueryFilter) - { - out.writeByte(0); - type.sliceQueryFilterSerializer().serialize((SliceQueryFilter)filter, out, version); - } - else - { - out.writeByte(1); - type.namesQueryFilterSerializer().serialize((NamesQueryFilter)filter, out, version); - } - } - - public IDiskAtomFilter deserialize(DataInput in, int version) throws IOException - { - int b = in.readByte(); - if (b == 0) - { - return type.sliceQueryFilterSerializer().deserialize(in, version); - } - else - { - assert b == 1; - return type.namesQueryFilterSerializer().deserialize(in, version); - } - } - - public long serializedSize(IDiskAtomFilter filter, int version) - { - int size = 1; - if (filter instanceof SliceQueryFilter) - size += type.sliceQueryFilterSerializer().serializedSize((SliceQueryFilter)filter, version); - else - size += type.namesQueryFilterSerializer().serializedSize((NamesQueryFilter)filter, version); - return size; - } - } - - public Iterator<RangeTombstone> getRangeTombstoneIterator(ColumnFamily source); -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java b/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java deleted file mode 100644 index c8f63bb..0000000 --- a/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java +++ /dev/null @@ -1,301 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.io.DataInput; -import java.io.IOException; -import java.util.Comparator; -import java.util.Iterator; -import java.util.SortedSet; -import java.util.TreeSet; - -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.commons.lang3.StringUtils; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.CType; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.utils.SearchIterator; - -public class NamesQueryFilter implements IDiskAtomFilter -{ - public final SortedSet<CellName> columns; - - // If true, getLiveCount will always return either 0 or 1. This uses the fact that we know - // CQL3 will never use a name filter with cell names spanning multiple CQL3 rows. - private final boolean countCQL3Rows; - - public NamesQueryFilter(SortedSet<CellName> columns) - { - this(columns, false); - } - - public NamesQueryFilter(SortedSet<CellName> columns, boolean countCQL3Rows) - { - this.columns = columns; - this.countCQL3Rows = countCQL3Rows; - } - - public NamesQueryFilter cloneShallow() - { - // NQF is immutable as far as shallow cloning is concerned, so save the allocation. - return this; - } - - public NamesQueryFilter withUpdatedColumns(SortedSet<CellName> newColumns) - { - return new NamesQueryFilter(newColumns, countCQL3Rows); - } - - @SuppressWarnings("unchecked") - public Iterator<Cell> getColumnIterator(ColumnFamily cf) - { - assert cf != null; - return (Iterator<Cell>) (Iterator<?>) new ByNameColumnIterator(columns.iterator(), null, cf); - } - - public OnDiskAtomIterator getColumnIterator(DecoratedKey key, ColumnFamily cf) - { - assert cf != null; - return new ByNameColumnIterator(columns.iterator(), key, cf); - } - - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key) - { - return sstable.iterator(key, columns); - } - - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry) - { - return sstable.iterator(file, key, columns, indexEntry); - } - - public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now) - { - DeletionInfo.InOrderTester tester = container.inOrderDeletionTester(); - while (reducedColumns.hasNext()) - container.maybeAppendColumn(reducedColumns.next(), tester, gcBefore); - } - - public Comparator<Cell> getColumnComparator(CellNameType comparator) - { - return comparator.columnComparator(false); - } - - @Override - public String toString() - { - return "NamesQueryFilter(" + - "columns=" + StringUtils.join(columns, ",") + - ')'; - } - - public boolean isReversed() - { - return false; - } - - public void updateColumnsLimit(int newLimit) - { - } - - public int getLiveCount(ColumnFamily cf, long now) - { - // Note: we could use columnCounter() but we save the object allocation as it's simple enough - - if (countCQL3Rows) - return cf.hasOnlyTombstones(now) ? 0 : 1; - - int count = 0; - for (Cell cell : cf) - { - if (cell.isLive(now)) - count++; - } - return count; - } - - public boolean maySelectPrefix(CType type, Composite prefix) - { - for (CellName column : columns) - { - if (prefix.isPrefixOf(type, column)) - return true; - } - return false; - } - - public boolean shouldInclude(SSTableReader sstable) - { - return true; - } - - public boolean isFullyCoveredBy(ColumnFamily cf, long now) - { - // cf will cover all the requested columns if the range it covers include - // all said columns - CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); - CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); - - return cf.getComparator().compare(first, columns.first()) <= 0 - && cf.getComparator().compare(columns.last(), last) <= 0; - } - - public boolean isHeadFilter() - { - return false; - } - - public boolean countCQL3Rows(CellNameType comparator) - { - return countCQL3Rows; - } - - public boolean countCQL3Rows() - { - return countCQL3Rows(null); - } - - public ColumnCounter columnCounter(CellNameType comparator, long now) - { - return countCQL3Rows - ? new ColumnCounter.GroupByPrefix(now, null, 0) - : new ColumnCounter(now); - } - - private static class ByNameColumnIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator - { - private final ColumnFamily cf; - private final DecoratedKey key; - private final Iterator<CellName> names; - private final SearchIterator<CellName, Cell> cells; - - public ByNameColumnIterator(Iterator<CellName> names, DecoratedKey key, ColumnFamily cf) - { - this.names = names; - this.cf = cf; - this.key = key; - this.cells = cf.searchIterator(); - } - - protected OnDiskAtom computeNext() - { - while (names.hasNext() && cells.hasNext()) - { - CellName current = names.next(); - Cell cell = cells.next(current); - if (cell != null) - return cell; - } - return endOfData(); - } - - public ColumnFamily getColumnFamily() - { - return cf; - } - - public DecoratedKey getKey() - { - return key; - } - - public void close() throws IOException { } - } - - public static class Serializer implements IVersionedSerializer<NamesQueryFilter> - { - private CellNameType type; - - public Serializer(CellNameType type) - { - this.type = type; - } - - public void serialize(NamesQueryFilter f, DataOutputPlus out, int version) throws IOException - { - out.writeInt(f.columns.size()); - ISerializer<CellName> serializer = type.cellSerializer(); - for (CellName cName : f.columns) - { - serializer.serialize(cName, out); - } - out.writeBoolean(f.countCQL3Rows); - } - - public NamesQueryFilter deserialize(DataInput in, int version) throws IOException - { - int size = in.readInt(); - SortedSet<CellName> columns = new TreeSet<>(type); - ISerializer<CellName> serializer = type.cellSerializer(); - for (int i = 0; i < size; ++i) - columns.add(serializer.deserialize(in)); - boolean countCQL3Rows = in.readBoolean(); - return new NamesQueryFilter(columns, countCQL3Rows); - } - - public long serializedSize(NamesQueryFilter f, int version) - { - TypeSizes sizes = TypeSizes.NATIVE; - int size = sizes.sizeof(f.columns.size()); - ISerializer<CellName> serializer = type.cellSerializer(); - for (CellName cName : f.columns) - size += serializer.serializedSize(cName, sizes); - size += sizes.sizeof(f.countCQL3Rows); - return size; - } - } - - public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source) - { - if (!source.deletionInfo().hasRanges()) - return Iterators.emptyIterator(); - - return new AbstractIterator<RangeTombstone>() - { - private final Iterator<CellName> names = columns.iterator(); - private RangeTombstone lastFindRange; - - protected RangeTombstone computeNext() - { - while (names.hasNext()) - { - CellName next = names.next(); - if (lastFindRange != null && lastFindRange.includes(source.getComparator(), next)) - return lastFindRange; - - // We keep the last range around as since names are in sort order, it's - // possible it will match the next name too. - lastFindRange = source.deletionInfo().rangeCovering(next); - if (lastFindRange != null) - return lastFindRange; - } - return endOfData(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/QueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/QueryFilter.java b/src/java/org/apache/cassandra/db/filter/QueryFilter.java deleted file mode 100644 index 15ee33d..0000000 --- a/src/java/org/apache/cassandra/db/filter/QueryFilter.java +++ /dev/null @@ -1,262 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.util.ArrayList; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; - -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.OnDiskAtom; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.utils.MergeIterator; - -public class QueryFilter -{ - public final DecoratedKey key; - public final String cfName; - public final IDiskAtomFilter filter; - public final long timestamp; - - public QueryFilter(DecoratedKey key, String cfName, IDiskAtomFilter filter, long timestamp) - { - this.key = key; - this.cfName = cfName; - this.filter = filter; - this.timestamp = timestamp; - } - - public Iterator<Cell> getIterator(ColumnFamily cf) - { - assert cf != null; - return filter.getColumnIterator(cf); - } - - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable) - { - return filter.getSSTableColumnIterator(sstable, key); - } - - public void collateOnDiskAtom(ColumnFamily returnCF, - List<? extends Iterator<? extends OnDiskAtom>> toCollate, - int gcBefore) - { - collateOnDiskAtom(returnCF, toCollate, filter, this.key, gcBefore, timestamp); - } - - public static void collateOnDiskAtom(ColumnFamily returnCF, - List<? extends Iterator<? extends OnDiskAtom>> toCollate, - IDiskAtomFilter filter, - DecoratedKey key, - int gcBefore, - long timestamp) - { - List<Iterator<Cell>> filteredIterators = new ArrayList<>(toCollate.size()); - for (Iterator<? extends OnDiskAtom> iter : toCollate) - filteredIterators.add(gatherTombstones(returnCF, iter)); - collateColumns(returnCF, filteredIterators, filter, key, gcBefore, timestamp); - } - - // When there is only a single source of atoms, we can skip the collate step - public void collateOnDiskAtom(ColumnFamily returnCF, Iterator<? extends OnDiskAtom> toCollate, int gcBefore) - { - filter.collectReducedColumns(returnCF, gatherTombstones(returnCF, toCollate), this.key, gcBefore, timestamp); - } - - public void collateColumns(ColumnFamily returnCF, List<? extends Iterator<Cell>> toCollate, int gcBefore) - { - collateColumns(returnCF, toCollate, filter, this.key, gcBefore, timestamp); - } - - public static void collateColumns(ColumnFamily returnCF, - List<? extends Iterator<Cell>> toCollate, - IDiskAtomFilter filter, - DecoratedKey key, - int gcBefore, - long timestamp) - { - Comparator<Cell> comparator = filter.getColumnComparator(returnCF.getComparator()); - - Iterator<Cell> reduced = toCollate.size() == 1 - ? toCollate.get(0) - : MergeIterator.get(toCollate, comparator, getReducer(comparator)); - - filter.collectReducedColumns(returnCF, reduced, key, gcBefore, timestamp); - } - - private static MergeIterator.Reducer<Cell, Cell> getReducer(final Comparator<Cell> comparator) - { - // define a 'reduced' iterator that merges columns w/ the same name, which - // greatly simplifies computing liveColumns in the presence of tombstones. - return new MergeIterator.Reducer<Cell, Cell>() - { - Cell current; - - public void reduce(Cell next) - { - assert current == null || comparator.compare(current, next) == 0; - current = current == null ? next : current.reconcile(next); - } - - protected Cell getReduced() - { - assert current != null; - Cell toReturn = current; - current = null; - return toReturn; - } - - @Override - public boolean trivialReduceIsTrivial() - { - return true; - } - }; - } - - /** - * Given an iterator of on disk atom, returns an iterator that filters the tombstone range - * markers adding them to {@code returnCF} and returns the normal column. - */ - public static Iterator<Cell> gatherTombstones(final ColumnFamily returnCF, final Iterator<? extends OnDiskAtom> iter) - { - return new Iterator<Cell>() - { - private Cell next; - - public boolean hasNext() - { - if (next != null) - return true; - - getNext(); - return next != null; - } - - public Cell next() - { - if (next == null) - getNext(); - - assert next != null; - Cell toReturn = next; - next = null; - return toReturn; - } - - private void getNext() - { - while (iter.hasNext()) - { - OnDiskAtom atom = iter.next(); - - if (atom instanceof Cell) - { - next = (Cell)atom; - break; - } - else - { - returnCF.addAtom(atom); - } - } - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - public String getColumnFamilyName() - { - return cfName; - } - - /** - * @return a QueryFilter object to satisfy the given slice criteria: - * @param key the row to slice - * @param cfName column family to query - * @param start column to start slice at, inclusive; empty for "the first column" - * @param finish column to stop slice at, inclusive; empty for "the last column" - * @param reversed true to start with the largest column (as determined by configured sort order) instead of smallest - * @param limit maximum number of non-deleted columns to return - * @param timestamp time to use for determining expiring columns' state - */ - public static QueryFilter getSliceFilter(DecoratedKey key, - String cfName, - Composite start, - Composite finish, - boolean reversed, - int limit, - long timestamp) - { - return new QueryFilter(key, cfName, new SliceQueryFilter(start, finish, reversed, limit), timestamp); - } - - /** - * return a QueryFilter object that includes every column in the row. - * This is dangerous on large rows; avoid except for test code. - */ - public static QueryFilter getIdentityFilter(DecoratedKey key, String cfName, long timestamp) - { - return new QueryFilter(key, cfName, new IdentityQueryFilter(), timestamp); - } - - /** - * @return a QueryFilter object that will return columns matching the given names - * @param key the row to slice - * @param cfName column family to query - * @param columns the column names to restrict the results to, sorted in comparator order - */ - public static QueryFilter getNamesFilter(DecoratedKey key, String cfName, SortedSet<CellName> columns, long timestamp) - { - return new QueryFilter(key, cfName, new NamesQueryFilter(columns), timestamp); - } - - @Override - public String toString() - { - return getClass().getSimpleName() + "(key=" + key + ", cfName=" + cfName + (filter == null ? "" : ", filter=" + filter) + ")"; - } - - public boolean shouldInclude(SSTableReader sstable) - { - return filter.shouldInclude(sstable); - } - - public void delete(DeletionInfo target, ColumnFamily source) - { - target.add(source.deletionInfo().getTopLevelDeletion()); - // source is the CF currently in the memtable, and it can be large compared to what the filter selects, - // so only consider those range tombstones that the filter do select. - for (Iterator<RangeTombstone> iter = filter.getRangeTombstoneIterator(source); iter.hasNext(); ) - target.add(iter.next(), source.getComparator()); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java new file mode 100644 index 0000000..aff8d16 --- /dev/null +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -0,0 +1,784 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.base.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.cql3.statements.RequestValidations.*; + +/** + * A filter on which rows a given query should include or exclude. + * <p> + * This corresponds to the restrictions on rows that are not handled by the query + * {@link ClusteringIndexFilter}. Some of the expressions of this filter may + * be handled by a 2ndary index, and the rest is simply filtered out from the + * result set (the later can only happen if the query was using ALLOW FILTERING). + */ +public abstract class RowFilter implements Iterable<RowFilter.Expression> +{ + public static final Serializer serializer = new Serializer(); + public static final RowFilter NONE = new CQLFilter(Collections.<Expression>emptyList()); + + protected final List<Expression> expressions; + + protected RowFilter(List<Expression> expressions) + { + this.expressions = expressions; + } + + public static RowFilter create() + { + return new CQLFilter(new ArrayList<Expression>()); + } + + public static RowFilter create(int capacity) + { + return new CQLFilter(new ArrayList<Expression>(capacity)); + } + + public static RowFilter forThrift(int capacity) + { + return new ThriftFilter(new ArrayList<Expression>(capacity)); + } + + public void add(ColumnDefinition def, Operator op, ByteBuffer value) + { + expressions.add(new SimpleExpression(def, op, value)); + } + + public void addMapEquality(ColumnDefinition def, ByteBuffer key, Operator op, ByteBuffer value) + { + expressions.add(new MapEqualityExpression(def, key, op, value)); + } + + public void addThriftExpression(CFMetaData metadata, ByteBuffer name, Operator op, ByteBuffer value) + { + assert (this instanceof ThriftFilter); + expressions.add(new ThriftExpression(metadata, name, op, value)); + } + + /** + * Filters the provided iterator so that only the row satisfying the expression of this filter + * are included in the resulting iterator. + * + * @param iter the iterator to filter + * @param nowInSec the time of query in seconds. + * @return the filtered iterator. + */ + public abstract UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec); + + /** + * Returns this filter but without the provided expression. This method + * *assumes* that the filter contains the provided expression. + */ + public RowFilter without(Expression expression) + { + assert expressions.contains(expression); + if (expressions.size() == 1) + return RowFilter.NONE; + + List<Expression> newExpressions = new ArrayList<>(expressions.size() - 1); + for (Expression e : expressions) + if (!e.equals(expression)) + newExpressions.add(e); + + return withNewExpressions(newExpressions); + } + + protected abstract RowFilter withNewExpressions(List<Expression> expressions); + + public boolean isEmpty() + { + return expressions.isEmpty(); + } + + public Iterator<Expression> iterator() + { + return expressions.iterator(); + } + + private static Clustering makeCompactClustering(CFMetaData metadata, ByteBuffer name) + { + assert metadata.isCompactTable(); + if (metadata.isCompound()) + { + List<ByteBuffer> values = CompositeType.splitName(name); + return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()])); + } + else + { + return new SimpleClustering(name); + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < expressions.size(); i++) + { + if (i > 0) + sb.append(" AND "); + sb.append(expressions.get(i)); + } + return sb.toString(); + } + + private static class CQLFilter extends RowFilter + { + private CQLFilter(List<Expression> expressions) + { + super(expressions); + } + + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) + { + if (expressions.isEmpty()) + return iter; + + return new WrappingUnfilteredPartitionIterator(iter) + { + @Override + public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter) + { + return new FilteringRowIterator(iter) + { + // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them. + // (we should however not filter them in the output of the method, hence it's not used as row filter for the + // FilteringRowIterator) + private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec); + + protected boolean includeRow(Row row) + { + return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row)); + } + }; + } + }; + } + + /** + * Returns whether the provided row (with it's partition key) satisfies + * this row filter or not (that is, if it satisfies all of its expressions). + */ + private boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) + { + for (Expression e : expressions) + if (!e.isSatisfiedBy(partitionKey, row)) + return false; + + return true; + } + + protected RowFilter withNewExpressions(List<Expression> expressions) + { + return new CQLFilter(expressions); + } + } + + private static class ThriftFilter extends RowFilter + { + private ThriftFilter(List<Expression> expressions) + { + super(expressions); + } + + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) + { + if (expressions.isEmpty()) + return iter; + + return new WrappingUnfilteredPartitionIterator(iter) + { + @Override + public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter) + { + // Thrift does not filter rows, it filters entire partition if any of the expression is not + // satisfied, which forces us to materialize the result (in theory we could materialize only + // what we need which might or might not be everything, but we keep it simple since in practice + // it's not worth that it has ever been). + ArrayBackedPartition result = ArrayBackedPartition.create(iter); + + // The partition needs to have a row for every expression, and the expression needs to be valid. + for (Expression expr : expressions) + { + assert expr instanceof ThriftExpression; + Row row = result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes)); + if (row == null || !expr.isSatisfiedBy(iter.partitionKey(), row)) + return null; + } + // If we get there, it means all expressions where satisfied, so return the original result + return result.unfilteredIterator(); + } + }; + } + + protected RowFilter withNewExpressions(List<Expression> expressions) + { + return new ThriftFilter(expressions); + } + } + + public static abstract class Expression + { + private static final Serializer serializer = new Serializer(); + + // Note: the order of this enum matter, it's used for serialization + protected enum Kind { SIMPLE, MAP_EQUALITY, THRIFT_DYN_EXPR } + + abstract Kind kind(); + protected final ColumnDefinition column; + protected final Operator operator; + protected final ByteBuffer value; + + protected Expression(ColumnDefinition column, Operator operator, ByteBuffer value) + { + this.column = column; + this.operator = operator; + this.value = value; + } + + public ColumnDefinition column() + { + return column; + } + + public Operator operator() + { + return operator; + } + + /** + * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> operator. + * + * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS</code> + * operator, <code>false</code> otherwise. + */ + public boolean isContains() + { + return Operator.CONTAINS == operator; + } + + /** + * Checks if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> operator. + * + * @return <code>true</code> if the operator of this <code>IndexExpression</code> is a <code>CONTAINS_KEY</code> + * operator, <code>false</code> otherwise. + */ + public boolean isContainsKey() + { + return Operator.CONTAINS_KEY == operator; + } + + /** + * If this expression is used to query an index, the value to use as + * partition key for that index query. + */ + public ByteBuffer getIndexValue() + { + return value; + } + + public void validateForIndexing() throws InvalidRequestException + { + checkNotNull(value, "Unsupported null value for indexed column %s", column.name); + checkBindValueSet(value, "Unsupported unset value for indexed column %s", column.name); + checkFalse(value.remaining() > FBUtilities.MAX_UNSIGNED_SHORT, "Index expression values may not be larger than 64K"); + } + + /** + * Returns whether the provided row satisfied this expression or not. + * + * @param partitionKey the partition key for row to check. + * @param row the row to check. It should *not* contain deleted cells + * (i.e. it should come from a RowIterator). + * @return whether the row is satisfied by this expression. + */ + public abstract boolean isSatisfiedBy(DecoratedKey partitionKey, Row row); + + protected ByteBuffer getValue(DecoratedKey partitionKey, Row row) + { + switch (column.kind) + { + case PARTITION_KEY: + return column.isOnAllComponents() + ? partitionKey.getKey() + : CompositeType.extractComponent(partitionKey.getKey(), column.position()); + case CLUSTERING_COLUMN: + return row.clustering().get(column.position()); + default: + Cell cell = row.getCell(column); + return cell == null ? null : cell.value(); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof Expression)) + return false; + + Expression that = (Expression)o; + + return Objects.equal(this.kind(), that.kind()) + && Objects.equal(this.column.name, that.column.name) + && Objects.equal(this.operator, that.operator) + && Objects.equal(this.value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hashCode(column.name, operator, value); + } + + private static class Serializer + { + public void serialize(Expression expression, DataOutputPlus out, int version) throws IOException + { + ByteBufferUtil.writeWithShortLength(expression.column.name.bytes, out); + expression.operator.writeTo(out); + + if (version >= MessagingService.VERSION_30) + out.writeByte(expression.kind().ordinal()); + + switch (expression.kind()) + { + case SIMPLE: + ByteBufferUtil.writeWithShortLength(((SimpleExpression)expression).value, out); + break; + case MAP_EQUALITY: + MapEqualityExpression mexpr = (MapEqualityExpression)expression; + if (version < MessagingService.VERSION_30) + { + ByteBufferUtil.writeWithShortLength(mexpr.getIndexValue(), out); + } + else + { + ByteBufferUtil.writeWithShortLength(mexpr.key, out); + ByteBufferUtil.writeWithShortLength(mexpr.value, out); + } + break; + case THRIFT_DYN_EXPR: + ByteBufferUtil.writeWithShortLength(((ThriftExpression)expression).value, out); + break; + } + } + + public Expression deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + { + ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + Operator operator = Operator.readFrom(in); + + ColumnDefinition column = metadata.getColumnDefinition(name); + if (!metadata.isCompactTable() && column == null) + throw new RuntimeException("Unknown (or dropped) column " + UTF8Type.instance.getString(name) + " during deserialization"); + + Kind kind; + if (version >= MessagingService.VERSION_30) + { + kind = Kind.values()[in.readByte()]; + } + else + { + if (column == null) + kind = Kind.THRIFT_DYN_EXPR; + else if (column.type instanceof MapType && operator == Operator.EQ) + kind = Kind.MAP_EQUALITY; + else + kind = Kind.SIMPLE; + } + + switch (kind) + { + case SIMPLE: + return new SimpleExpression(column, operator, ByteBufferUtil.readWithShortLength(in)); + case MAP_EQUALITY: + ByteBuffer key, value; + if (version < MessagingService.VERSION_30) + { + ByteBuffer composite = ByteBufferUtil.readWithShortLength(in); + key = CompositeType.extractComponent(composite, 0); + value = CompositeType.extractComponent(composite, 0); + } + else + { + key = ByteBufferUtil.readWithShortLength(in); + value = ByteBufferUtil.readWithShortLength(in); + } + return new MapEqualityExpression(column, key, operator, value); + case THRIFT_DYN_EXPR: + return new ThriftExpression(metadata, name, operator, ByteBufferUtil.readWithShortLength(in)); + } + throw new AssertionError(); + } + + public long serializedSize(Expression expression, int version) + { + TypeSizes sizes = TypeSizes.NATIVE; + long size = ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes, sizes) + + expression.operator.serializedSize(); + + switch (expression.kind()) + { + case SIMPLE: + size += ByteBufferUtil.serializedSizeWithShortLength(((SimpleExpression)expression).value, sizes); + break; + case MAP_EQUALITY: + MapEqualityExpression mexpr = (MapEqualityExpression)expression; + if (version < MessagingService.VERSION_30) + size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.getIndexValue(), sizes); + else + size += ByteBufferUtil.serializedSizeWithShortLength(mexpr.key, sizes) + + ByteBufferUtil.serializedSizeWithShortLength(mexpr.value, sizes); + break; + case THRIFT_DYN_EXPR: + size += ByteBufferUtil.serializedSizeWithShortLength(((ThriftExpression)expression).value, sizes); + break; + } + return size; + } + } + } + + /** + * An expression of the form 'column' 'op' 'value'. + */ + private static class SimpleExpression extends Expression + { + public SimpleExpression(ColumnDefinition column, Operator operator, ByteBuffer value) + { + super(column, operator, value); + } + + public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) + { + // We support null conditions for LWT (in ColumnCondition) but not for RowFilter. + // TODO: we should try to merge both code someday. + assert value != null; + + if (row.isStatic() != column.isStatic()) + return true; + + switch (operator) + { + case EQ: + case LT: + case LTE: + case GTE: + case GT: + case NEQ: + { + assert !column.isComplex() : "Only CONTAINS and CONTAINS_KEY are supported for 'complex' types"; + ByteBuffer foundValue = getValue(partitionKey, row); + // Note that CQL expression are always of the form 'x < 4', i.e. the tested value is on the left. + return foundValue != null && operator.isSatisfiedBy(column.type, foundValue, value); + } + case CONTAINS: + assert column.type.isCollection(); + CollectionType<?> type = (CollectionType<?>)column.type; + if (column.isComplex()) + { + Iterator<Cell> iter = row.getCells(column); + while (iter.hasNext()) + { + Cell cell = iter.next(); + if (type.kind == CollectionType.Kind.SET) + { + if (type.nameComparator().compare(cell.path().get(0), value) == 0) + return true; + } + else + { + if (type.valueComparator().compare(cell.value(), value) == 0) + return true; + } + } + return false; + } + else + { + ByteBuffer foundValue = getValue(partitionKey, row); + if (foundValue == null) + return false; + + switch (type.kind) + { + case LIST: + ListType<?> listType = (ListType<?>)type; + return listType.compose(foundValue).contains(listType.getElementsType().compose(value)); + case SET: + SetType<?> setType = (SetType<?>)type; + return setType.compose(foundValue).contains(setType.getElementsType().compose(value)); + case MAP: + MapType<?,?> mapType = (MapType<?, ?>)type; + return mapType.compose(foundValue).containsValue(mapType.getValuesType().compose(value)); + } + throw new AssertionError(); + } + case CONTAINS_KEY: + assert column.type.isCollection() && column.type instanceof MapType; + MapType<?, ?> mapType = (MapType<?, ?>)column.type; + if (column.isComplex()) + { + return row.getCell(column, CellPath.create(value)) != null; + } + else + { + ByteBuffer foundValue = getValue(partitionKey, row); + return foundValue != null && mapType.getSerializer().getSerializedValue(foundValue, value, mapType.getKeysType()) != null; + } + + case IN: + // It wouldn't be terribly hard to support this (though doing so would imply supporting + // IN for 2ndary index) but currently we don't. + throw new AssertionError(); + } + throw new AssertionError(); + } + + @Override + public String toString() + { + AbstractType<?> type = column.type; + switch (operator) + { + case CONTAINS: + assert type instanceof CollectionType; + CollectionType<?> ct = (CollectionType<?>)type; + type = ct.kind == CollectionType.Kind.SET ? ct.nameComparator() : ct.valueComparator(); + break; + case CONTAINS_KEY: + assert type instanceof MapType; + type = ((MapType<?, ?>)type).nameComparator(); + break; + case IN: + type = ListType.getInstance(type, false); + break; + default: + break; + } + return String.format("%s %s %s", column.name, operator, type.getString(value)); + } + + @Override + Kind kind() + { + return Kind.SIMPLE; + } + } + + /** + * An expression of the form 'column' ['key'] = 'value' (which is only + * supported when 'column' is a map). + */ + private static class MapEqualityExpression extends Expression + { + private final ByteBuffer key; + + public MapEqualityExpression(ColumnDefinition column, ByteBuffer key, Operator operator, ByteBuffer value) + { + super(column, operator, value); + assert column.type instanceof MapType && operator == Operator.EQ; + this.key = key; + } + + @Override + public void validateForIndexing() throws InvalidRequestException + { + super.validateForIndexing(); + checkNotNull(key, "Unsupported null value for key of map column %s", column.name); + checkBindValueSet(key, "Unsupported unset value for key of map column %s", column.name); + } + + @Override + public ByteBuffer getIndexValue() + { + return CompositeType.build(key, value); + } + + public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) + { + assert key != null; + // We support null conditions for LWT (in ColumnCondition) but not for RowFilter. + // TODO: we should try to merge both code someday. + assert value != null; + + if (row.isStatic() != column.isStatic()) + return true; + + MapType<?, ?> mt = (MapType<?, ?>)column.type; + if (column.isComplex()) + { + Cell cell = row.getCell(column, CellPath.create(key)); + return cell != null && mt.valueComparator().compare(cell.value(), value) == 0; + } + else + { + ByteBuffer serializedMap = getValue(partitionKey, row); + if (serializedMap == null) + return false; + + ByteBuffer foundValue = mt.getSerializer().getSerializedValue(serializedMap, key, mt.getKeysType()); + return foundValue != null && mt.valueComparator().compare(foundValue, value) == 0; + } + } + + @Override + public String toString() + { + MapType<?, ?> mt = (MapType<?, ?>)column.type; + return String.format("%s[%s] = %s", column.name, mt.nameComparator().getString(key), mt.valueComparator().getString(value)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof MapEqualityExpression)) + return false; + + MapEqualityExpression that = (MapEqualityExpression)o; + + return Objects.equal(this.column.name, that.column.name) + && Objects.equal(this.operator, that.operator) + && Objects.equal(this.key, that.key) + && Objects.equal(this.value, that.value); + } + + @Override + public int hashCode() + { + return Objects.hashCode(column.name, operator, key, value); + } + + @Override + Kind kind() + { + return Kind.MAP_EQUALITY; + } + } + + /** + * An expression of the form 'name' = 'value', but where 'name' is actually the + * clustering value for a compact table. This is only for thrift. + */ + private static class ThriftExpression extends Expression + { + private final CFMetaData metadata; + + public ThriftExpression(CFMetaData metadata, ByteBuffer name, Operator operator, ByteBuffer value) + { + super(makeDefinition(metadata, name), operator, value); + assert metadata.isCompactTable(); + this.metadata = metadata; + } + + private static ColumnDefinition makeDefinition(CFMetaData metadata, ByteBuffer name) + { + ColumnDefinition def = metadata.getColumnDefinition(name); + if (def != null) + return def; + + // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate + // this we create a "fake" definition. This is messy but it works so is probably good enough. + return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null); + } + + public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) + { + assert value != null; + + // On thrift queries, even if the column expression is a "static" one, we'll have convert it as a "dynamic" + // one in ThriftResultsMerger, so we always expect it to be a dynamic one. Further, we expect this is only + // called when the row clustering does match the column (see ThriftFilter above). + assert row.clustering().equals(makeCompactClustering(metadata, column.name.bytes)); + Cell cell = row.getCell(metadata.compactValueColumn()); + return cell != null && operator.isSatisfiedBy(column.type, cell.value(), value); + } + + @Override + public String toString() + { + return String.format("%s %s %s", column.name, operator, column.type.getString(value)); + } + + @Override + Kind kind() + { + return Kind.THRIFT_DYN_EXPR; + } + } + + public static class Serializer + { + public void serialize(RowFilter filter, DataOutputPlus out, int version) throws IOException + { + out.writeBoolean(filter instanceof ThriftFilter); + out.writeShort(filter.expressions.size()); + for (Expression expr : filter.expressions) + Expression.serializer.serialize(expr, out, version); + } + + public RowFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + { + boolean forThrift = in.readBoolean(); + int size = in.readUnsignedShort(); + List<Expression> expressions = new ArrayList<>(size); + for (int i = 0; i < size; i++) + expressions.add(Expression.serializer.deserialize(in, version, metadata)); + return forThrift + ? new ThriftFilter(expressions) + : new CQLFilter(expressions); + } + + public long serializedSize(RowFilter filter, int version) + { + TypeSizes sizes = TypeSizes.NATIVE; + long size = 1 // forThrift + + sizes.sizeof((short)filter.expressions.size()); + for (Expression expr : filter.expressions) + size += Expression.serializer.serializedSize(expr, version); + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java b/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java deleted file mode 100644 index 4571161..0000000 --- a/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java +++ /dev/null @@ -1,583 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.nio.ByteBuffer; -import java.io.DataInput; -import java.io.IOException; -import java.util.*; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.service.ClientWarn; -import org.apache.cassandra.tracing.Tracing; - -public class SliceQueryFilter implements IDiskAtomFilter -{ - private static final Logger logger = LoggerFactory.getLogger(SliceQueryFilter.class); - - /** - * A special value for compositesToGroup that indicates that partitioned tombstones should not be included in results - * or count towards the limit. See CASSANDRA-8490 for more details on why this is needed (and done this way). - **/ - public static final int IGNORE_TOMBSTONED_PARTITIONS = -2; - - public final ColumnSlice[] slices; - public final boolean reversed; - public volatile int count; - public final int compositesToGroup; - - // Not serialized, just a ack for range slices to find the number of live column counted, even when we group - private ColumnCounter columnCounter; - - public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count) - { - this(new ColumnSlice(start, finish), reversed, count); - } - - public SliceQueryFilter(Composite start, Composite finish, boolean reversed, int count, int compositesToGroup) - { - this(new ColumnSlice(start, finish), reversed, count, compositesToGroup); - } - - public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count) - { - this(new ColumnSlice[]{ slice }, reversed, count); - } - - public SliceQueryFilter(ColumnSlice slice, boolean reversed, int count, int compositesToGroup) - { - this(new ColumnSlice[]{ slice }, reversed, count, compositesToGroup); - } - - /** - * Constructor that accepts multiple slices. All slices are assumed to be in the same direction (forward or - * reversed). - */ - public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count) - { - this(slices, reversed, count, -1); - } - - public SliceQueryFilter(ColumnSlice[] slices, boolean reversed, int count, int compositesToGroup) - { - this.slices = slices; - this.reversed = reversed; - this.count = count; - this.compositesToGroup = compositesToGroup; - } - - public SliceQueryFilter cloneShallow() - { - return new SliceQueryFilter(slices, reversed, count, compositesToGroup); - } - - public SliceQueryFilter withUpdatedCount(int newCount) - { - return new SliceQueryFilter(slices, reversed, newCount, compositesToGroup); - } - - public SliceQueryFilter withUpdatedSlices(ColumnSlice[] newSlices) - { - return new SliceQueryFilter(newSlices, reversed, count, compositesToGroup); - } - - /** Returns true if the slice includes static columns, false otherwise. */ - private boolean sliceIncludesStatics(ColumnSlice slice, CFMetaData cfm) - { - return cfm.hasStaticColumns() && - slice.includes(reversed ? cfm.comparator.reverseComparator() : cfm.comparator, cfm.comparator.staticPrefix().end()); - } - - public boolean hasStaticSlice(CFMetaData cfm) - { - for (ColumnSlice slice : slices) - if (sliceIncludesStatics(slice, cfm)) - return true; - - return false; - } - - /** - * Splits this filter into two SliceQueryFilters: one that slices only the static columns, and one that slices the - * remainder of the normal data. - * - * This should only be called when the filter is reversed and the filter is known to cover static columns (through - * hasStaticSlice()). - * - * @return a pair of (static, normal) SliceQueryFilters - */ - public Pair<SliceQueryFilter, SliceQueryFilter> splitOutStaticSlice(CFMetaData cfm) - { - assert reversed; - - Composite staticSliceEnd = cfm.comparator.staticPrefix().end(); - List<ColumnSlice> nonStaticSlices = new ArrayList<>(slices.length); - for (ColumnSlice slice : slices) - { - if (sliceIncludesStatics(slice, cfm)) - nonStaticSlices.add(new ColumnSlice(slice.start, staticSliceEnd)); - else - nonStaticSlices.add(slice); - } - - return Pair.create( - new SliceQueryFilter(staticSliceEnd, Composites.EMPTY, true, count, compositesToGroup), - new SliceQueryFilter(nonStaticSlices.toArray(new ColumnSlice[nonStaticSlices.size()]), true, count, compositesToGroup)); - } - - public SliceQueryFilter withUpdatedStart(Composite newStart, CFMetaData cfm) - { - Comparator<Composite> cmp = reversed ? cfm.comparator.reverseComparator() : cfm.comparator; - - // Check our slices to see if any fall before the new start (in which case they can be removed) or - // if they contain the new start (in which case they should start from the page start). However, if the - // slices would include static columns, we need to ensure they are also fetched, and so a separate - // slice for the static columns may be required. - // Note that if the query is reversed, we can't handle statics by simply adding a separate slice here, so - // the reversed case is handled by SliceFromReadCommand instead. See CASSANDRA-8502 for more details. - List<ColumnSlice> newSlices = new ArrayList<>(); - boolean pastNewStart = false; - for (ColumnSlice slice : slices) - { - if (pastNewStart) - { - newSlices.add(slice); - continue; - } - - if (slice.isBefore(cmp, newStart)) - { - if (!reversed && sliceIncludesStatics(slice, cfm)) - newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end())); - - continue; - } - else if (slice.includes(cmp, newStart)) - { - if (!reversed && sliceIncludesStatics(slice, cfm) && !newStart.isEmpty()) - newSlices.add(new ColumnSlice(Composites.EMPTY, cfm.comparator.staticPrefix().end())); - - newSlices.add(new ColumnSlice(newStart, slice.finish)); - } - else - { - newSlices.add(slice); - } - - pastNewStart = true; - } - return withUpdatedSlices(newSlices.toArray(new ColumnSlice[newSlices.size()])); - } - - public Iterator<Cell> getColumnIterator(ColumnFamily cf) - { - assert cf != null; - return reversed ? cf.reverseIterator(slices) : cf.iterator(slices); - } - - public OnDiskAtomIterator getColumnIterator(final DecoratedKey key, final ColumnFamily cf) - { - assert cf != null; - final Iterator<Cell> iter = getColumnIterator(cf); - - return new OnDiskAtomIterator() - { - public ColumnFamily getColumnFamily() - { - return cf; - } - - public DecoratedKey getKey() - { - return key; - } - - public boolean hasNext() - { - return iter.hasNext(); - } - - public OnDiskAtom next() - { - return iter.next(); - } - - public void close() throws IOException { } - - public void remove() - { - throw new UnsupportedOperationException(); - } - }; - } - - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, DecoratedKey key) - { - return sstable.iterator(key, slices, reversed); - } - - public OnDiskAtomIterator getSSTableColumnIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry) - { - return sstable.iterator(file, key, slices, reversed, indexEntry); - } - - public Comparator<Cell> getColumnComparator(CellNameType comparator) - { - return reversed ? comparator.columnReverseComparator() : comparator.columnComparator(false); - } - - public void collectReducedColumns(ColumnFamily container, Iterator<Cell> reducedColumns, DecoratedKey key, int gcBefore, long now) - { - columnCounter = columnCounter(container.getComparator(), now); - DeletionInfo.InOrderTester tester = container.deletionInfo().inOrderTester(reversed); - - while (reducedColumns.hasNext()) - { - Cell cell = reducedColumns.next(); - - if (logger.isTraceEnabled()) - logger.trace("collecting {} of {}: {}", columnCounter.live(), count, cell.getString(container.getComparator())); - - // An expired tombstone will be immediately discarded in memory, and needn't be counted. - // Neither should be any cell shadowed by a range- or a partition tombstone. - if (cell.getLocalDeletionTime() < gcBefore || !columnCounter.count(cell, tester)) - continue; - - if (columnCounter.live() > count) - break; - - if (respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneFailureThreshold()) - { - Tracing.trace("Scanned over {} tombstones; query aborted (see tombstone_failure_threshold); slices={}", - DatabaseDescriptor.getTombstoneFailureThreshold(), getSlicesInfo(container)); - - throw new TombstoneOverwhelmingException(columnCounter.tombstones(), - count, - container.metadata().ksName, - container.metadata().cfName, - container.getComparator().getString(cell.name()), - getSlicesInfo(container)); - } - - container.appendColumn(cell); - } - - boolean warnTombstones = logger.isWarnEnabled() && respectTombstoneThresholds() && columnCounter.tombstones() > DatabaseDescriptor.getTombstoneWarnThreshold(); - if (warnTombstones) - { - String msg = String.format("Read %d live and %d tombstone cells in %s.%s for key: %1.512s (see tombstone_warn_threshold). %d columns were requested, slices=%1.512s", - columnCounter.live(), - columnCounter.tombstones(), - container.metadata().ksName, - container.metadata().cfName, - container.metadata().getKeyValidator().getString(key.getKey()), - count, - getSlicesInfo(container)); - ClientWarn.warn(msg); - logger.warn(msg); - } - Tracing.trace("Read {} live and {} tombstone cells{}", - columnCounter.live(), - columnCounter.tombstones(), - warnTombstones ? " (see tombstone_warn_threshold)" : ""); - } - - private String getSlicesInfo(ColumnFamily container) - { - StringBuilder sb = new StringBuilder(); - CellNameType type = container.metadata().comparator; - for (ColumnSlice sl : slices) - { - assert sl != null; - - sb.append('['); - sb.append(type.getString(sl.start)); - sb.append('-'); - sb.append(type.getString(sl.finish)); - sb.append(']'); - } - return sb.toString(); - } - - protected boolean respectTombstoneThresholds() - { - return true; - } - - public int getLiveCount(ColumnFamily cf, long now) - { - return columnCounter(cf.getComparator(), now).countAll(cf).live(); - } - - public ColumnCounter columnCounter(CellNameType comparator, long now) - { - if (compositesToGroup < 0) - return new ColumnCounter(now); - else if (compositesToGroup == 0) - return new ColumnCounter.GroupByPrefix(now, null, 0); - else if (reversed) - return new ColumnCounter.GroupByPrefixReversed(now, comparator, compositesToGroup); - else - return new ColumnCounter.GroupByPrefix(now, comparator, compositesToGroup); - } - - public void trim(ColumnFamily cf, int trimTo, long now) - { - // each cell can increment the count by at most one, so if we have fewer cells than trimTo, we can skip trimming - if (cf.getColumnCount() < trimTo) - return; - - ColumnCounter counter = columnCounter(cf.getComparator(), now); - - Collection<Cell> cells = reversed - ? cf.getReverseSortedColumns() - : cf.getSortedColumns(); - - DeletionInfo.InOrderTester tester = cf.deletionInfo().inOrderTester(reversed); - - for (Iterator<Cell> iter = cells.iterator(); iter.hasNext(); ) - { - Cell cell = iter.next(); - counter.count(cell, tester); - - if (counter.live() > trimTo) - { - iter.remove(); - while (iter.hasNext()) - { - iter.next(); - iter.remove(); - } - } - } - } - - public Composite start() - { - return this.slices[0].start; - } - - public Composite finish() - { - return this.slices[slices.length - 1].finish; - } - - public void setStart(Composite start) - { - assert slices.length == 1; - this.slices[0] = new ColumnSlice(start, this.slices[0].finish); - } - - public int lastCounted() - { - // If we have a slice limit set, columnCounter.live() can overcount by one because we have to call - // columnCounter.count() before we can tell if we've exceeded the slice limit (and accordingly, should not - // add the cells to returned container). To deal with this overcounting, we take the min of the slice - // limit and the counter's count. - return columnCounter == null ? 0 : Math.min(columnCounter.live(), count); - } - - public int lastTombstones() - { - return columnCounter == null ? 0 : columnCounter.tombstones(); - } - - public int lastLive() - { - return columnCounter == null ? 0 : columnCounter.live(); - } - - @Override - public String toString() - { - return "SliceQueryFilter [reversed=" + reversed + ", slices=" + Arrays.toString(slices) + ", count=" + count + ", toGroup = " + compositesToGroup + "]"; - } - - public boolean isReversed() - { - return reversed; - } - - public void updateColumnsLimit(int newLimit) - { - count = newLimit; - } - - public boolean maySelectPrefix(CType type, Composite prefix) - { - for (ColumnSlice slice : slices) - if (slice.includes(type, prefix)) - return true; - return false; - } - - public boolean shouldInclude(SSTableReader sstable) - { - List<ByteBuffer> minColumnNames = sstable.getSSTableMetadata().minColumnNames; - List<ByteBuffer> maxColumnNames = sstable.getSSTableMetadata().maxColumnNames; - CellNameType comparator = sstable.metadata.comparator; - - if (minColumnNames.isEmpty() || maxColumnNames.isEmpty()) - return true; - - for (ColumnSlice slice : slices) - if (slice.intersects(minColumnNames, maxColumnNames, comparator, reversed)) - return true; - - return false; - } - - public boolean isHeadFilter() - { - return slices.length == 1 && slices[0].start.isEmpty() && !reversed; - } - - public boolean countCQL3Rows(CellNameType comparator) - { - // If comparator is dense a cell == a CQL3 rows so we're always counting CQL3 rows - // in particular. Otherwise, we do so only if we group the cells into CQL rows. - return comparator.isDense() || compositesToGroup >= 0; - } - - public boolean isFullyCoveredBy(ColumnFamily cf, long now) - { - // cf is the beginning of a partition. It covers this filter if: - // 1) either this filter requests the head of the partition and request less - // than what cf has to offer (note: we do need to use getLiveCount() for that - // as it knows if the filter count cells or CQL3 rows). - // 2) the start and finish bound of this filter are included in cf. - if (isHeadFilter() && count <= getLiveCount(cf, now)) - return true; - - if (start().isEmpty() || finish().isEmpty() || !cf.hasColumns()) - return false; - - Composite low = isReversed() ? finish() : start(); - Composite high = isReversed() ? start() : finish(); - - CellName first = cf.iterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); - CellName last = cf.reverseIterator(ColumnSlice.ALL_COLUMNS_ARRAY).next().name(); - - return cf.getComparator().compare(first, low) <= 0 - && cf.getComparator().compare(high, last) <= 0; - } - - public static class Serializer implements IVersionedSerializer<SliceQueryFilter> - { - private CType type; - - public Serializer(CType type) - { - this.type = type; - } - - public void serialize(SliceQueryFilter f, DataOutputPlus out, int version) throws IOException - { - out.writeInt(f.slices.length); - for (ColumnSlice slice : f.slices) - type.sliceSerializer().serialize(slice, out, version); - out.writeBoolean(f.reversed); - int count = f.count; - out.writeInt(count); - - out.writeInt(f.compositesToGroup); - } - - public SliceQueryFilter deserialize(DataInput in, int version) throws IOException - { - ColumnSlice[] slices; - slices = new ColumnSlice[in.readInt()]; - for (int i = 0; i < slices.length; i++) - slices[i] = type.sliceSerializer().deserialize(in, version); - boolean reversed = in.readBoolean(); - int count = in.readInt(); - int compositesToGroup = in.readInt(); - - return new SliceQueryFilter(slices, reversed, count, compositesToGroup); - } - - public long serializedSize(SliceQueryFilter f, int version) - { - TypeSizes sizes = TypeSizes.NATIVE; - - int size = 0; - size += sizes.sizeof(f.slices.length); - for (ColumnSlice slice : f.slices) - size += type.sliceSerializer().serializedSize(slice, version); - size += sizes.sizeof(f.reversed); - size += sizes.sizeof(f.count); - - size += sizes.sizeof(f.compositesToGroup); - return size; - } - } - - public Iterator<RangeTombstone> getRangeTombstoneIterator(final ColumnFamily source) - { - final DeletionInfo delInfo = source.deletionInfo(); - if (!delInfo.hasRanges() || slices.length == 0) - return Iterators.emptyIterator(); - - return new AbstractIterator<RangeTombstone>() - { - private int sliceIdx = 0; - private Iterator<RangeTombstone> sliceIter = currentRangeIter(); - - protected RangeTombstone computeNext() - { - while (true) - { - if (sliceIter.hasNext()) - return sliceIter.next(); - - if (!nextSlice()) - return endOfData(); - - sliceIter = currentRangeIter(); - } - } - - private Iterator<RangeTombstone> currentRangeIter() - { - ColumnSlice slice = slices[reversed ? (slices.length - 1 - sliceIdx) : sliceIdx]; - return reversed ? delInfo.rangeIterator(slice.finish, slice.start) - : delInfo.rangeIterator(slice.start, slice.finish); - } - - private boolean nextSlice() - { - return ++sliceIdx < slices.length; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java index 7624e1b..98b539e 100644 --- a/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java +++ b/src/java/org/apache/cassandra/db/filter/TombstoneOverwhelmingException.java @@ -18,49 +18,51 @@ */ package org.apache.cassandra.db.filter; -import org.apache.cassandra.db.DecoratedKey; +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.*; public class TombstoneOverwhelmingException extends RuntimeException { - private final int numTombstones; - private final int numRequested; - private final String ksName; - private final String cfName; - private final String lastCellName; - private final String slicesInfo; - private String partitionKey = null; - - public TombstoneOverwhelmingException(int numTombstones, - int numRequested, - String ksName, - String cfName, - String lastCellName, - String slicesInfo) + public TombstoneOverwhelmingException(int numTombstones, String query, CFMetaData metadata, DecoratedKey lastPartitionKey, ClusteringPrefix lastClustering) { - this.numTombstones = numTombstones; - this.numRequested = numRequested; - this.ksName = ksName; - this.cfName = cfName; - this.lastCellName = lastCellName; - this.slicesInfo = slicesInfo; + super(String.format("Scanned over %d tombstones during query '%s' (last scanned row partion key was (%s)); query aborted", + numTombstones, query, makePKString(metadata, lastPartitionKey.getKey(), lastClustering))); } - public void setKey(DecoratedKey key) + private static String makePKString(CFMetaData metadata, ByteBuffer partitionKey, ClusteringPrefix clustering) { - if (key != null) - partitionKey = key.toString(); - } + StringBuilder sb = new StringBuilder(); - public String getLocalizedMessage() - { - return getMessage(); - } + if (clustering.size() > 0) + sb.append("("); - public String getMessage() - { - return String.format( - "Scanned over %d tombstones in %s.%s; %d columns were requested; query aborted " + - "(see tombstone_failure_threshold); partitionKey=%s; lastCell=%s; slices=%s", - numTombstones, ksName, cfName, numRequested, partitionKey, lastCellName, slicesInfo); + // TODO: We should probably make that a lot easier/transparent for partition keys + AbstractType<?> pkType = metadata.getKeyValidator(); + if (pkType instanceof CompositeType) + { + CompositeType ct = (CompositeType)pkType; + ByteBuffer[] values = ct.split(partitionKey); + for (int i = 0; i < values.length; i++) + { + if (i > 0) + sb.append(", "); + sb.append(ct.types.get(i).getString(values[i])); + } + } + else + { + sb.append(pkType.getString(partitionKey)); + } + + if (clustering.size() > 0) + sb.append(")"); + + for (int i = 0; i < clustering.size(); i++) + sb.append(", ").append(metadata.comparator.subtype(i).getString(clustering.get(i))); + + return sb.toString(); } }
