http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java new file mode 100644 index 0000000..99140ef --- /dev/null +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.DataInput; +import java.io.IOException; +import java.util.*; + +import com.google.common.collect.SortedSetMultimap; +import com.google.common.collect.TreeMultimap; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected + * by a query. + * + * In practice, this class cover 2 main cases: + * 1) most user queries have to internally query all columns, because the CQL semantic requires us to know if + * a row is live or not even if it has no values for the columns requested by the user (see #6588for more + * details). However, while we need to know for columns if it has live values, we can actually save from + * sending the values for those columns that will not be returned to the user. + * 2) for some internal queries (and for queries using #6588 if we introduce it), we're actually fine only + * actually querying some of the columns. + * + * For complex columns, this class allows to be more fine grained than the column by only selection some of the + * cells of the complex column (either individual cell by path name, or some slice). + */ +public class ColumnFilter +{ + public static final Serializer serializer = new Serializer(); + + private static final Comparator<ColumnIdentifier> keyComparator = new Comparator<ColumnIdentifier>() + { + public int compare(ColumnIdentifier id1, ColumnIdentifier id2) + { + return ByteBufferUtil.compareUnsigned(id1.bytes, id2.bytes); + } + }; + private static final Comparator<ColumnSubselection> valueComparator = new Comparator<ColumnSubselection>() + { + public int compare(ColumnSubselection s1, ColumnSubselection s2) + { + assert s1.column().name.equals(s2.column().name); + return s1.column().cellPathComparator().compare(s1.minIncludedPath(), s2.minIncludedPath()); + } + }; + + // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved + // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped. + // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all. + private final boolean isFetchAll; + + private final CFMetaData metadata; // can be null if !isFetchAll + + private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value + private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null + + private ColumnFilter(boolean isFetchAll, + CFMetaData metadata, + PartitionColumns columns, + SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections) + { + this.isFetchAll = isFetchAll; + this.metadata = metadata; + this.selection = columns; + this.subSelections = subSelections; + } + + /** + * A selection that includes all columns (and their values). + */ + public static ColumnFilter all(CFMetaData metadata) + { + return new ColumnFilter(true, metadata, null, null); + } + + /** + * A selection that only fetch the provided columns. + * <p> + * Note that this shouldn't be used for CQL queries in general as all columns should be queried to + * preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and + * for #6588 if/when we implement it). + */ + public static ColumnFilter selection(PartitionColumns columns) + { + return new ColumnFilter(false, null, columns, null); + } + + /** + * The columns that needs to be fetched internally for this selection. + * <p> + * This is the columns that must be present in the internal rows returned by queries using this selection, + * not the columns that are actually queried by the user (see the class javadoc for details). + * + * @return the column to fetch for this selection. + */ + public PartitionColumns fetchedColumns() + { + return isFetchAll ? metadata.partitionColumns() : selection; + } + + /** + * Whether the provided column is selected by this selection. + */ + public boolean includes(ColumnDefinition column) + { + return isFetchAll || selection.contains(column); + } + + /** + * Whether we can skip the value for the provided selected column. + */ + public boolean canSkipValue(ColumnDefinition column) + { + return isFetchAll && selection != null && !selection.contains(column); + } + + /** + * Whether the provided cell of a complex column is selected by this selection. + */ + public boolean includes(Cell cell) + { + if (isFetchAll || subSelections == null || !cell.column().isComplex()) + return true; + + SortedSet<ColumnSubselection> s = subSelections.get(cell.column().name); + if (s.isEmpty()) + return true; + + for (ColumnSubselection subSel : s) + if (subSel.includes(cell.path())) + return true; + + return false; + } + + /** + * Whether we can skip the value of the cell of a complex column. + */ + public boolean canSkipValue(ColumnDefinition column, CellPath path) + { + if (!isFetchAll || subSelections == null || !column.isComplex()) + return false; + + SortedSet<ColumnSubselection> s = subSelections.get(column.name); + if (s.isEmpty()) + return false; + + for (ColumnSubselection subSel : s) + if (subSel.includes(path)) + return false; + + return true; + } + + /** + * Creates a new {@code Tester} to efficiently test the inclusion of cells of complex column + * {@code column}. + */ + public Tester newTester(ColumnDefinition column) + { + if (subSelections == null || !column.isComplex()) + return null; + + SortedSet<ColumnSubselection> s = subSelections.get(column.name); + if (s.isEmpty()) + return null; + + return new Tester(s.iterator()); + } + + /** + * Returns a {@code ColumnFilter}} builder that includes all columns (so the selections + * added to the builder are the columns/cells for which we shouldn't skip the values). + */ + public static Builder allColumnsBuilder(CFMetaData metadata) + { + return new Builder(metadata); + } + + /** + * Returns a {@code ColumnFilter}} builder that includes only the columns/cells + * added to the builder. + */ + public static Builder selectionBuilder() + { + return new Builder(null); + } + + public static class Tester + { + private ColumnSubselection current; + private final Iterator<ColumnSubselection> iterator; + + private Tester(Iterator<ColumnSubselection> iterator) + { + this.iterator = iterator; + } + + public boolean includes(CellPath path) + { + while (current == null) + { + if (!iterator.hasNext()) + return false; + + current = iterator.next(); + if (current.includes(path)) + return true; + + if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0) + current = null; + } + return false; + } + + public boolean canSkipValue(CellPath path) + { + while (current == null) + { + if (!iterator.hasNext()) + return false; + + current = iterator.next(); + if (current.includes(path)) + return false; + + if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0) + current = null; + } + return true; + } + } + + public static class Builder + { + private final CFMetaData metadata; + private PartitionColumns.Builder selection; + private List<ColumnSubselection> subSelections; + + public Builder(CFMetaData metadata) + { + this.metadata = metadata; + } + + public Builder add(ColumnDefinition c) + { + if (selection == null) + selection = PartitionColumns.builder(); + selection.add(c); + return this; + } + + public Builder addAll(Iterable<ColumnDefinition> columns) + { + if (selection == null) + selection = PartitionColumns.builder(); + selection.addAll(columns); + return this; + } + + private Builder addSubSelection(ColumnSubselection subSelection) + { + add(subSelection.column()); + if (subSelections == null) + subSelections = new ArrayList<>(); + subSelections.add(subSelection); + return this; + } + + public Builder slice(ColumnDefinition c, CellPath from, CellPath to) + { + return addSubSelection(ColumnSubselection.slice(c, from, to)); + } + + public Builder select(ColumnDefinition c, CellPath elt) + { + return addSubSelection(ColumnSubselection.element(c, elt)); + } + + public ColumnFilter build() + { + boolean isFetchAll = metadata != null; + assert isFetchAll || selection != null; + + SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null; + if (subSelections != null) + { + s = TreeMultimap.create(keyComparator, valueComparator); + for (ColumnSubselection subSelection : subSelections) + s.put(subSelection.column().name, subSelection); + } + + return new ColumnFilter(isFetchAll, metadata, selection == null ? null : selection.build(), s); + } + } + + @Override + public String toString() + { + if (selection == null) + return "*"; + + Iterator<ColumnDefinition> defs = selection.selectOrderIterator(); + StringBuilder sb = new StringBuilder(); + appendColumnDef(sb, defs.next()); + while (defs.hasNext()) + appendColumnDef(sb.append(", "), defs.next()); + return sb.toString(); + } + + private void appendColumnDef(StringBuilder sb, ColumnDefinition column) + { + if (subSelections == null) + { + sb.append(column.name); + return; + } + + SortedSet<ColumnSubselection> s = subSelections.get(column.name); + if (s.isEmpty()) + { + sb.append(column.name); + return; + } + + int i = 0; + for (ColumnSubselection subSel : s) + sb.append(i++ == 0 ? "" : ", ").append(column.name).append(subSel); + } + + public static class Serializer + { + private static final int IS_FETCH_ALL_MASK = 0x01; + private static final int HAS_SELECTION_MASK = 0x02; + private static final int HAS_SUB_SELECTIONS_MASK = 0x04; + + private int makeHeaderByte(ColumnFilter selection) + { + return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) + | (selection.selection != null ? HAS_SELECTION_MASK : 0) + | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); + } + + public void serialize(ColumnFilter selection, DataOutputPlus out, int version) throws IOException + { + out.writeByte(makeHeaderByte(selection)); + + if (selection.selection != null) + { + Columns.serializer.serialize(selection.selection.statics, out); + Columns.serializer.serialize(selection.selection.regulars, out); + } + + if (selection.subSelections != null) + { + out.writeShort(selection.subSelections.size()); + for (ColumnSubselection subSel : selection.subSelections.values()) + ColumnSubselection.serializer.serialize(subSel, out, version); + } + } + + public ColumnFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + { + int header = in.readUnsignedByte(); + boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0; + boolean hasSelection = (header & HAS_SELECTION_MASK) != 0; + boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0; + + PartitionColumns selection = null; + if (hasSelection) + { + Columns statics = Columns.serializer.deserialize(in, metadata); + Columns regulars = Columns.serializer.deserialize(in, metadata); + selection = new PartitionColumns(statics, regulars); + } + + SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null; + if (hasSubSelections) + { + subSelections = TreeMultimap.create(keyComparator, valueComparator); + int size = in.readUnsignedShort(); + for (int i = 0; i < size; i++) + { + ColumnSubselection subSel = ColumnSubselection.serializer.deserialize(in, version, metadata); + subSelections.put(subSel.column().name, subSel); + } + } + + return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections); + } + + public long serializedSize(ColumnFilter selection, int version, TypeSizes sizes) + { + long size = 1; // header byte + + if (selection.selection != null) + { + size += Columns.serializer.serializedSize(selection.selection.statics, sizes); + size += Columns.serializer.serializedSize(selection.selection.regulars, sizes); + } + + if (selection.subSelections != null) + { + + size += sizes.sizeof((short)selection.subSelections.size()); + for (ColumnSubselection subSel : selection.subSelections.values()) + size += ColumnSubselection.serializer.serializedSize(subSel, version, sizes); + } + + return size; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnSlice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java b/src/java/org/apache/cassandra/db/filter/ColumnSlice.java deleted file mode 100644 index 1cc348c..0000000 --- a/src/java/org/apache/cassandra/db/filter/ColumnSlice.java +++ /dev/null @@ -1,289 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.io.*; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; - -public class ColumnSlice -{ - public static final ColumnSlice ALL_COLUMNS = new ColumnSlice(Composites.EMPTY, Composites.EMPTY); - public static final ColumnSlice[] ALL_COLUMNS_ARRAY = new ColumnSlice[]{ ALL_COLUMNS }; - - public final Composite start; - public final Composite finish; - - public ColumnSlice(Composite start, Composite finish) - { - assert start != null && finish != null; - this.start = start; - this.finish = finish; - } - - public boolean isAlwaysEmpty(CellNameType comparator, boolean reversed) - { - Comparator<Composite> orderedComparator = reversed ? comparator.reverseComparator() : comparator; - return !start.isEmpty() && !finish.isEmpty() && orderedComparator.compare(start, finish) > 0; - } - - public boolean includes(Comparator<Composite> cmp, Composite name) - { - return (start.isEmpty() || cmp.compare(start, name) <= 0) && (finish.isEmpty() || cmp.compare(finish, name) >= 0); - } - - public boolean isBefore(Comparator<Composite> cmp, Composite name) - { - return !finish.isEmpty() && cmp.compare(finish, name) < 0; - } - - public boolean intersects(List<ByteBuffer> minCellNames, List<ByteBuffer> maxCellNames, CellNameType comparator, boolean reversed) - { - Composite sStart = reversed ? finish : start; - Composite sEnd = reversed ? start : finish; - - if (compare(sStart, maxCellNames, comparator, true) > 0 || compare(sEnd, minCellNames, comparator, false) < 0) - return false; - - // We could safely return true here, but there's a minor optimization: if the first component is restricted - // to a single value, we can check that the second component falls within the min/max for that component - // (and repeat for all components). - for (int i = 0; i < minCellNames.size() && i < maxCellNames.size(); i++) - { - AbstractType<?> t = comparator.subtype(i); - ByteBuffer s = i < sStart.size() ? sStart.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER; - ByteBuffer f = i < sEnd.size() ? sEnd.get(i) : ByteBufferUtil.EMPTY_BYTE_BUFFER; - - // we already know the first component falls within its min/max range (otherwise we wouldn't get here) - if (i > 0 && (i < sEnd.size() && t.compare(f, minCellNames.get(i)) < 0 || - i < sStart.size() && t.compare(s, maxCellNames.get(i)) > 0)) - return false; - - // if this component isn't equal in the start and finish, we don't need to check any more - if (i >= sStart.size() || i >= sEnd.size() || t.compare(s, f) != 0) - break; - } - - return true; - } - - /** Helper method for intersects() */ - private int compare(Composite sliceBounds, List<ByteBuffer> sstableBounds, CellNameType comparator, boolean isSliceStart) - { - for (int i = 0; i < sstableBounds.size(); i++) - { - if (i >= sliceBounds.size()) - { - // When isSliceStart is true, we're comparing the end of the slice against the min cell name for the sstable, - // so the slice is something like [(1, 0), (1, 0)], and the sstable max is something like (1, 0, 1). - // We want to return -1 (slice start is smaller than max column name) so that we say the slice intersects. - // The opposite is true when dealing with the end slice. For example, with the same slice and a min - // cell name of (1, 0, 1), we want to return 1 (slice end is bigger than min column name). - return isSliceStart ? -1 : 1; - } - - int comparison = comparator.subtype(i).compare(sliceBounds.get(i), sstableBounds.get(i)); - if (comparison != 0) - return comparison; - } - - // the slice bound and sstable bound have been equal in all components so far - if (sliceBounds.size() > sstableBounds.size()) - { - // We have the opposite situation from the one described above. With a slice of [(1, 0), (1, 0)], - // and a min/max cell name of (1), we want to say the slice start is smaller than the max and the slice - // end is larger than the min. - return isSliceStart ? -1 : 1; - } - - return 0; - } - - /** - * Validates that the provided slice array contains only non-overlapped slices valid for a query {@code reversed} - * or not on a table using {@code comparator}. - */ - public static boolean validateSlices(ColumnSlice[] slices, CellNameType type, boolean reversed) - { - Comparator<Composite> comparator = reversed ? type.reverseComparator() : type; - - for (int i = 0; i < slices.length; i++) - { - Composite start = slices[i].start; - Composite finish = slices[i].finish; - - if (start.isEmpty() || finish.isEmpty()) - { - if (start.isEmpty() && i > 0) - return false; - - if (finish.isEmpty()) - return i == slices.length - 1; - } - else - { - // !finish.isEmpty() is imposed by prior loop - if (i > 0 && comparator.compare(slices[i - 1].finish, start) >= 0) - return false; - - if (comparator.compare(start, finish) > 0) - return false; - } - } - return true; - } - - /** - * Takes an array of slices (potentially overlapping and in any order, though each individual slice must have - * its start before or equal its end in {@code comparator} orde) and return an equivalent array of non-overlapping - * slices in {@code comparator order}. - * - * @param slices an array of slices. This may be modified by this method. - * @param comparator the order in which to sort the slices. - * @return the smallest possible array of non-overlapping slices in {@code compator} order. If the original - * slices are already non-overlapping and in comparator order, this may or may not return the provided slices - * directly. - */ - public static ColumnSlice[] deoverlapSlices(ColumnSlice[] slices, final Comparator<Composite> comparator) - { - if (slices.length <= 1) - return slices; - - Arrays.sort(slices, new Comparator<ColumnSlice>() - { - @Override - public int compare(ColumnSlice s1, ColumnSlice s2) - { - if (s1.start.isEmpty() || s2.start.isEmpty()) - { - if (s1.start.isEmpty() != s2.start.isEmpty()) - return s1.start.isEmpty() ? -1 : 1; - } - else - { - int c = comparator.compare(s1.start, s2.start); - if (c != 0) - return c; - } - - // For the finish, empty always means greater - return s1.finish.isEmpty() || s2.finish.isEmpty() - ? (s1.finish.isEmpty() ? 1 : -1) - : comparator.compare(s1.finish, s2.finish); - } - }); - - List<ColumnSlice> slicesCopy = new ArrayList<>(slices.length); - - ColumnSlice last = slices[0]; - - for (int i = 1; i < slices.length; i++) - { - ColumnSlice s2 = slices[i]; - - boolean includesStart = last.includes(comparator, s2.start); - boolean includesFinish = s2.finish.isEmpty() ? last.finish.isEmpty() : last.includes(comparator, s2.finish); - - if (includesStart && includesFinish) - continue; - - if (!includesStart && !includesFinish) - { - slicesCopy.add(last); - last = s2; - continue; - } - - if (includesStart) - { - last = new ColumnSlice(last.start, s2.finish); - continue; - } - - assert !includesFinish; - } - - slicesCopy.add(last); - - return slicesCopy.toArray(new ColumnSlice[slicesCopy.size()]); - } - - @Override - public final int hashCode() - { - int hashCode = 31 + start.hashCode(); - return 31*hashCode + finish.hashCode(); - } - - @Override - public final boolean equals(Object o) - { - if(!(o instanceof ColumnSlice)) - return false; - ColumnSlice that = (ColumnSlice)o; - return start.equals(that.start) && finish.equals(that.finish); - } - - @Override - public String toString() - { - return "[" + ByteBufferUtil.bytesToHex(start.toByteBuffer()) + ", " + ByteBufferUtil.bytesToHex(finish.toByteBuffer()) + "]"; - } - - public static class Serializer implements IVersionedSerializer<ColumnSlice> - { - private final CType type; - - public Serializer(CType type) - { - this.type = type; - } - - public void serialize(ColumnSlice cs, DataOutputPlus out, int version) throws IOException - { - ISerializer<Composite> serializer = type.serializer(); - serializer.serialize(cs.start, out); - serializer.serialize(cs.finish, out); - } - - public ColumnSlice deserialize(DataInput in, int version) throws IOException - { - ISerializer<Composite> serializer = type.serializer(); - Composite start = serializer.deserialize(in); - Composite finish = serializer.deserialize(in); - return new ColumnSlice(start, finish); - } - - public long serializedSize(ColumnSlice cs, int version) - { - ISerializer<Composite> serializer = type.serializer(); - return serializer.serializedSize(cs.start, TypeSizes.NATIVE) + serializer.serializedSize(cs.finish, TypeSizes.NATIVE); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java new file mode 100644 index 0000000..35db6f2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Comparator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Handles the selection of a subpart of a column. + * <p> + * This only make sense for complex column. For those, this allow for instance + * to select only a slice of a map. + */ +public abstract class ColumnSubselection +{ + public static final Serializer serializer = new Serializer(); + + private enum Kind { SLICE, ELEMENT } + + protected final ColumnDefinition column; + + protected ColumnSubselection(ColumnDefinition column) + { + this.column = column; + } + + public static ColumnSubselection slice(ColumnDefinition column, CellPath from, CellPath to) + { + assert column.isComplex() && column.type instanceof CollectionType; + assert from.size() <= 1 && to.size() <= 1; + return new Slice(column, from, to); + } + + public static ColumnSubselection element(ColumnDefinition column, CellPath elt) + { + assert column.isComplex() && column.type instanceof CollectionType; + assert elt.size() == 1; + return new Element(column, elt); + } + + public ColumnDefinition column() + { + return column; + } + + protected abstract Kind kind(); + + public abstract CellPath minIncludedPath(); + public abstract CellPath maxIncludedPath(); + public abstract boolean includes(CellPath path); + + private static class Slice extends ColumnSubselection + { + private final CellPath from; + private final CellPath to; + + private Slice(ColumnDefinition column, CellPath from, CellPath to) + { + super(column); + this.from = from; + this.to = to; + } + + protected Kind kind() + { + return Kind.SLICE; + } + + public CellPath minIncludedPath() + { + return from; + } + + public CellPath maxIncludedPath() + { + return to; + } + + public boolean includes(CellPath path) + { + Comparator<CellPath> cmp = column.cellPathComparator(); + return cmp.compare(from, path) <= 0 && cmp.compare(path, to) <= 0; + } + + @Override + public String toString() + { + // This assert we're dealing with a collection since that's the only thing it's used for so far. + AbstractType<?> type = ((CollectionType<?>)column().type).nameComparator(); + return String.format("[%s:%s]", from == CellPath.BOTTOM ? "" : type.getString(from.get(0)), to == CellPath.TOP ? "" : type.getString(to.get(0))); + } + } + + private static class Element extends ColumnSubselection + { + private final CellPath element; + + private Element(ColumnDefinition column, CellPath elt) + { + super(column); + this.element = elt; + } + + protected Kind kind() + { + return Kind.ELEMENT; + } + + public CellPath minIncludedPath() + { + return element; + } + + public CellPath maxIncludedPath() + { + return element; + } + + public boolean includes(CellPath path) + { + Comparator<CellPath> cmp = column.cellPathComparator(); + return cmp.compare(element, path) == 0; + } + + @Override + public String toString() + { + // This assert we're dealing with a collection since that's the only thing it's used for so far. + AbstractType<?> type = ((CollectionType<?>)column().type).nameComparator(); + return String.format("[%s]", type.getString(element.get(0))); + } + } + + public static class Serializer + { + public void serialize(ColumnSubselection subSel, DataOutputPlus out, int version) throws IOException + { + ColumnDefinition column = subSel.column(); + ByteBufferUtil.writeWithShortLength(column.name.bytes, out); + out.writeByte(subSel.kind().ordinal()); + switch (subSel.kind()) + { + case SLICE: + Slice slice = (Slice)subSel; + column.cellPathSerializer().serialize(slice.from, out); + column.cellPathSerializer().serialize(slice.to, out); + break; + case ELEMENT: + Element eltSelection = (Element)subSel; + column.cellPathSerializer().serialize(eltSelection.element, out); + break; + } + throw new AssertionError(); + } + + public ColumnSubselection deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + { + ByteBuffer name = ByteBufferUtil.readWithShortLength(in); + ColumnDefinition column = metadata.getColumnDefinition(name); + if (column == null) + { + // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't + // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper + // deserialization. The column will be ignore later on anyway. + column = metadata.getDroppedColumnDefinition(name); + if (column == null) + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + } + + Kind kind = Kind.values()[in.readUnsignedByte()]; + switch (kind) + { + case SLICE: + CellPath from = column.cellPathSerializer().deserialize(in); + CellPath to = column.cellPathSerializer().deserialize(in); + return new Slice(column, from, to); + case ELEMENT: + CellPath elt = column.cellPathSerializer().deserialize(in); + return new Element(column, elt); + } + throw new AssertionError(); + } + + public long serializedSize(ColumnSubselection subSel, int version, TypeSizes sizes) + { + long size = 0; + + ColumnDefinition column = subSel.column(); + size += sizes.sizeofWithShortLength(column.name.bytes); + size += 1; // kind + switch (subSel.kind()) + { + case SLICE: + Slice slice = (Slice)subSel; + size += column.cellPathSerializer().serializedSize(slice.from, sizes); + size += column.cellPathSerializer().serializedSize(slice.to, sizes); + break; + case ELEMENT: + Element element = (Element)subSel; + size += column.cellPathSerializer().serializedSize(element.element, sizes); + break; + } + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java new file mode 100644 index 0000000..42bfa4e --- /dev/null +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -0,0 +1,737 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.filter; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; + +/** + * Object in charge of tracking if we have fetch enough data for a given query. + * + * The reason this is not just a simple integer is that Thrift and CQL3 count + * stuffs in different ways. This is what abstract those differences. + */ +public abstract class DataLimits +{ + public static final Serializer serializer = new Serializer(); + + public static final DataLimits NONE = new CQLLimits(Integer.MAX_VALUE) + { + @Override + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) + { + return false; + } + + @Override + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + { + return iter; + } + + @Override + public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) + { + return iter; + } + }; + + // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per + // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering. + public static final DataLimits DISTINCT_NONE = new CQLLimits(Integer.MAX_VALUE, 1, true); + + private enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT } + + public static DataLimits cqlLimits(int cqlRowLimit) + { + return new CQLLimits(cqlRowLimit); + } + + public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit) + { + return new CQLLimits(cqlRowLimit, perPartitionLimit); + } + + public static DataLimits distinctLimits(int cqlRowLimit) + { + return CQLLimits.distinct(cqlRowLimit); + } + + public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit) + { + return new ThriftLimits(partitionLimit, cellPerPartitionLimit); + } + + public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) + { + return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); + } + + protected abstract Kind kind(); + + public abstract boolean isUnlimited(); + + public abstract DataLimits forPaging(int pageSize); + public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining); + + public abstract DataLimits forShortReadRetry(int toFetch); + + public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec); + + /** + * Returns a new {@code Counter} for this limits. + * + * @param nowInSec the current time in second (to decide what is expired or not). + * @param assumeLiveData if true, the counter will assume that every row passed is live and won't + * thus check for liveness, otherwise it will. This should be {@code true} when used on a + * {@code RowIterator} (since it only returns live rows), false otherwise. + * @return a new {@code Counter} for this limits. + */ + public abstract Counter newCounter(int nowInSec, boolean assumeLiveData); + + /** + * The max number of results this limits enforces. + * <p> + * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for + * thrift, it means cells. The {@link #countsCells} allows to distinguish between the two cases if + * needed. + * + * @return the maximum number of results this limits enforces. + */ + public abstract int count(); + + public abstract int perPartitionCount(); + + public abstract boolean countsCells(); + + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) + { + return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false)); + } + + public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec) + { + return new CountingUnfilteredRowIterator(iter, newCounter(nowInSec, false)); + } + + public PartitionIterator filter(PartitionIterator iter, int nowInSec) + { + return new CountingPartitionIterator(iter, this, nowInSec); + } + + /** + * Estimate the number of results (the definition of "results" will be rows for CQL queries + * and partitions for thrift ones) that a full scan of the provided cfs would yield. + */ + public abstract float estimateTotalResults(ColumnFamilyStore cfs); + + public interface Counter + { + public void newPartition(DecoratedKey partitionKey, Row staticRow); + public void newRow(Row row); + public void endOfPartition(); + + /** + * The number of results counted. + * <p> + * Note that the definition of "results" should be the same that for {@link #count}. + * + * @return the number of results counted. + */ + public int counted(); + + public int countedInCurrentPartition(); + + public boolean isDone(); + public boolean isDoneForPartition(); + } + + /** + * Limits used by CQL; this counts rows. + */ + private static class CQLLimits extends DataLimits + { + protected final int rowLimit; + protected final int perPartitionLimit; + + // Whether the query is a distinct query or not. This is currently not used by the code but prior experience + // shows that keeping the information around is wise and might be useful in the future. + protected final boolean isDistinct; + + private CQLLimits(int rowLimit) + { + this(rowLimit, Integer.MAX_VALUE); + } + + private CQLLimits(int rowLimit, int perPartitionLimit) + { + this(rowLimit, perPartitionLimit, false); + } + + private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct) + { + this.rowLimit = rowLimit; + this.perPartitionLimit = perPartitionLimit; + this.isDistinct = isDistinct; + } + + private static CQLLimits distinct(int rowLimit) + { + return new CQLLimits(rowLimit, 1, true); + } + + protected Kind kind() + { + return Kind.CQL_LIMIT; + } + + public boolean isUnlimited() + { + return rowLimit == Integer.MAX_VALUE && perPartitionLimit == Integer.MAX_VALUE; + } + + public DataLimits forPaging(int pageSize) + { + return new CQLLimits(pageSize, perPartitionLimit); + } + + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // When we do a short read retry, we're only ever querying the single partition on which we have a short read. So + // we use toFetch as the row limit and use no perPartitionLimit (it would be equivalent in practice to use toFetch + // for both argument or just for perPartitionLimit with no limit on rowLimit). + return new CQLLimits(toFetch, Integer.MAX_VALUE, isDistinct); + } + + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) + { + // We want the number of row that are currently live. Getting that precise number forces + // us to iterate the cached partition in general, but we can avoid that if: + // - The number of rows with at least one non-expiring cell is greater than what we ask, + // in which case we know we have enough live. + // - The number of rows is less than requested, in which case we know we won't have enough. + if (cached.rowsWithNonExpiringCells() >= rowLimit) + return true; + + if (cached.rowCount() < rowLimit) + return false; + + // Otherwise, we need to re-count + try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); + CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false))) + { + // Consume the iterator until we've counted enough + while (iter.hasNext() && !iter.counter().isDone()) + iter.next(); + return iter.counter().isDone(); + } + } + + public Counter newCounter(int nowInSec, boolean assumeLiveData) + { + return new CQLCounter(nowInSec, assumeLiveData); + } + + public int count() + { + return rowLimit; + } + + public int perPartitionCount() + { + return perPartitionLimit; + } + + public boolean countsCells() + { + return false; + } + + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // TODO: we should start storing stats on the number of rows (instead of the number of cells, which + // is what getMeanColumns returns) + float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount(); + return rowsPerPartition * (cfs.estimateKeys()); + } + + protected class CQLCounter implements Counter + { + protected final int nowInSec; + protected final boolean assumeLiveData; + + protected int rowCounted; + protected int rowInCurrentPartition; + + protected boolean hasLiveStaticRow; + + public CQLCounter(int nowInSec, boolean assumeLiveData) + { + this.nowInSec = nowInSec; + this.assumeLiveData = assumeLiveData; + } + + public void newPartition(DecoratedKey partitionKey, Row staticRow) + { + rowInCurrentPartition = 0; + if (!staticRow.isEmpty() && (assumeLiveData || staticRow.hasLiveData(nowInSec))) + hasLiveStaticRow = true; + } + + public void endOfPartition() + { + // Normally, we don't count static rows as from a CQL point of view, it will be merge with other + // rows in the partition. However, if we only have the static row, it will be returned as one row + // so count it. + if (hasLiveStaticRow && rowInCurrentPartition == 0) + ++rowCounted; + } + + public int counted() + { + return rowCounted; + } + + public int countedInCurrentPartition() + { + return rowInCurrentPartition; + } + + public boolean isDone() + { + return rowCounted >= rowLimit; + } + + public boolean isDoneForPartition() + { + return isDone() || rowInCurrentPartition >= perPartitionLimit; + } + + public void newRow(Row row) + { + if (assumeLiveData || row.hasLiveData(nowInSec)) + { + ++rowCounted; + ++rowInCurrentPartition; + } + } + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + if (rowLimit != Integer.MAX_VALUE) + { + sb.append("LIMIT ").append(rowLimit); + if (perPartitionLimit != Integer.MAX_VALUE) + sb.append(" "); + } + + if (perPartitionLimit != Integer.MAX_VALUE) + sb.append("PER PARTITION LIMIT ").append(perPartitionLimit); + + return sb.toString(); + } + } + + private static class CQLPagingLimits extends CQLLimits + { + private final ByteBuffer lastReturnedKey; + private final int lastReturnedKeyRemaining; + + public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + super(rowLimit, perPartitionLimit, isDistinct); + this.lastReturnedKey = lastReturnedKey; + this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; + } + + @Override + protected Kind kind() + { + return Kind.CQL_PAGING_LIMIT; + } + + @Override + public DataLimits forPaging(int pageSize) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + @Override + public Counter newCounter(int nowInSec, boolean assumeLiveData) + { + return new PagingAwareCounter(nowInSec, assumeLiveData); + } + + private class PagingAwareCounter extends CQLCounter + { + private PagingAwareCounter(int nowInSec, boolean assumeLiveData) + { + super(nowInSec, assumeLiveData); + } + + @Override + public void newPartition(DecoratedKey partitionKey, Row staticRow) + { + if (partitionKey.getKey().equals(lastReturnedKey)) + { + rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining; + // lastReturnedKey is the last key for which we're returned rows in the first page. + // So, since we know we have returned rows, we know we have accounted for the static row + // if any already, so force hasLiveStaticRow to false so we make sure to not count it + // once more. + hasLiveStaticRow = false; + } + else + { + super.newPartition(partitionKey, staticRow); + } + } + } + } + + /** + * Limits used by thrift; this count partition and cells. + */ + private static class ThriftLimits extends DataLimits + { + protected final int partitionLimit; + protected final int cellPerPartitionLimit; + + private ThriftLimits(int partitionLimit, int cellPerPartitionLimit) + { + this.partitionLimit = partitionLimit; + this.cellPerPartitionLimit = cellPerPartitionLimit; + } + + protected Kind kind() + { + return Kind.THRIFT_LIMIT; + } + + public boolean isUnlimited() + { + return partitionLimit == Integer.MAX_VALUE && cellPerPartitionLimit == Integer.MAX_VALUE; + } + + public DataLimits forPaging(int pageSize) + { + // We don't support paging on thrift in general but do use paging under the hood for get_count. For + // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single + // partition). We do check that the partition limit is 1 however to make sure this is not misused + // (as this wouldn't work properly for range queries). + assert partitionLimit == 1; + return new ThriftLimits(partitionLimit, pageSize); + } + + public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) + { + throw new UnsupportedOperationException(); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // Short read retries are always done for a single partition at a time, so it's ok to ignore the + // partition limit for those + return new ThriftLimits(1, toFetch); + } + + public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec) + { + // We want the number of cells that are currently live. Getting that precise number forces + // us to iterate the cached partition in general, but we can avoid that if: + // - The number of non-expiring live cells is greater than the number of cells asked (we then + // know we have enough live cells). + // - The number of cells cached is less than requested, in which case we know we won't have enough. + if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit) + return true; + + if (cached.nonTombstoneCellCount() < cellPerPartitionLimit) + return false; + + // Otherwise, we need to re-count + try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); + CountingUnfilteredRowIterator iter = new CountingUnfilteredRowIterator(cacheIter, newCounter(nowInSec, false))) + { + // Consume the iterator until we've counted enough + while (iter.hasNext() && !iter.counter().isDone()) + iter.next(); + return iter.counter().isDone(); + } + } + + public Counter newCounter(int nowInSec, boolean assumeLiveData) + { + return new ThriftCounter(nowInSec, assumeLiveData); + } + + public int count() + { + return partitionLimit * cellPerPartitionLimit; + } + + public int perPartitionCount() + { + return cellPerPartitionLimit; + } + + public boolean countsCells() + { + return true; + } + + public float estimateTotalResults(ColumnFamilyStore cfs) + { + // remember that getMeansColumns returns a number of cells: we should clean nomenclature + float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.columnCount(); + return cellsPerPartition * cfs.estimateKeys(); + } + + protected class ThriftCounter implements Counter + { + protected final int nowInSec; + protected final boolean assumeLiveData; + + protected int partitionsCounted; + protected int cellsCounted; + protected int cellsInCurrentPartition; + + public ThriftCounter(int nowInSec, boolean assumeLiveData) + { + this.nowInSec = nowInSec; + this.assumeLiveData = assumeLiveData; + } + + public void newPartition(DecoratedKey partitionKey, Row staticRow) + { + cellsInCurrentPartition = 0; + if (!staticRow.isEmpty()) + newRow(staticRow); + } + + public void endOfPartition() + { + ++partitionsCounted; + } + + public int counted() + { + return cellsCounted; + } + + public int countedInCurrentPartition() + { + return cellsInCurrentPartition; + } + + public boolean isDone() + { + return partitionsCounted >= partitionLimit; + } + + public boolean isDoneForPartition() + { + return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit; + } + + public void newRow(Row row) + { + for (Cell cell : row) + { + if (assumeLiveData || cell.isLive(nowInSec)) + { + ++cellsCounted; + ++cellsInCurrentPartition; + } + } + } + } + + @Override + public String toString() + { + // This is not valid CQL, but that's ok since it's not used for CQL queries. + return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit); + } + } + + /** + * Limits used for thrift get_count when we only want to count super columns. + */ + private static class SuperColumnCountingLimits extends ThriftLimits + { + private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) + { + super(partitionLimit, cellPerPartitionLimit); + } + + protected Kind kind() + { + return Kind.SUPER_COLUMN_COUNTING_LIMIT; + } + + public DataLimits forPaging(int pageSize) + { + // We don't support paging on thrift in general but do use paging under the hood for get_count. For + // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single + // partition). We do check that the partition limit is 1 however to make sure this is not misused + // (as this wouldn't work properly for range queries). + assert partitionLimit == 1; + return new SuperColumnCountingLimits(partitionLimit, pageSize); + } + + public DataLimits forShortReadRetry(int toFetch) + { + // Short read retries are always done for a single partition at a time, so it's ok to ignore the + // partition limit for those + return new SuperColumnCountingLimits(1, toFetch); + } + + public Counter newCounter(int nowInSec, boolean assumeLiveData) + { + return new SuperColumnCountingCounter(nowInSec, assumeLiveData); + } + + protected class SuperColumnCountingCounter extends ThriftCounter + { + public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData) + { + super(nowInSec, assumeLiveData); + } + + public void newRow(Row row) + { + // In the internal format, a row == a super column, so that's what we want to count. + if (assumeLiveData || row.hasLiveData(nowInSec)) + { + ++cellsCounted; + ++cellsInCurrentPartition; + } + } + } + } + + public static class Serializer + { + public void serialize(DataLimits limits, DataOutputPlus out, int version) throws IOException + { + out.writeByte(limits.kind().ordinal()); + switch (limits.kind()) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + CQLLimits cqlLimits = (CQLLimits)limits; + out.writeInt(cqlLimits.rowLimit); + out.writeInt(cqlLimits.perPartitionLimit); + out.writeBoolean(cqlLimits.isDistinct); + if (limits.kind() == Kind.CQL_PAGING_LIMIT) + { + CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; + ByteBufferUtil.writeWithShortLength(pagingLimits.lastReturnedKey, out); + out.writeInt(pagingLimits.lastReturnedKeyRemaining); + } + break; + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + ThriftLimits thriftLimits = (ThriftLimits)limits; + out.writeInt(thriftLimits.partitionLimit); + out.writeInt(thriftLimits.cellPerPartitionLimit); + break; + } + } + + public DataLimits deserialize(DataInput in, int version) throws IOException + { + Kind kind = Kind.values()[in.readUnsignedByte()]; + switch (kind) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + int rowLimit = in.readInt(); + int perPartitionLimit = in.readInt(); + boolean isDistinct = in.readBoolean(); + if (kind == Kind.CQL_LIMIT) + return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); + + ByteBuffer lastKey = ByteBufferUtil.readWithShortLength(in); + int lastRemaining = in.readInt(); + return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining); + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + int partitionLimit = in.readInt(); + int cellPerPartitionLimit = in.readInt(); + return kind == Kind.THRIFT_LIMIT + ? new ThriftLimits(partitionLimit, cellPerPartitionLimit) + : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); + } + throw new AssertionError(); + } + + public long serializedSize(DataLimits limits, int version) + { + TypeSizes sizes = TypeSizes.NATIVE; + long size = sizes.sizeof((byte)limits.kind().ordinal()); + switch (limits.kind()) + { + case CQL_LIMIT: + case CQL_PAGING_LIMIT: + CQLLimits cqlLimits = (CQLLimits)limits; + size += sizes.sizeof(cqlLimits.rowLimit); + size += sizes.sizeof(cqlLimits.perPartitionLimit); + size += sizes.sizeof(cqlLimits.isDistinct); + if (limits.kind() == Kind.CQL_PAGING_LIMIT) + { + CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; + size += ByteBufferUtil.serializedSizeWithShortLength(pagingLimits.lastReturnedKey, sizes); + size += sizes.sizeof(pagingLimits.lastReturnedKeyRemaining); + } + break; + case THRIFT_LIMIT: + case SUPER_COLUMN_COUNTING_LIMIT: + ThriftLimits thriftLimits = (ThriftLimits)limits; + size += sizes.sizeof(thriftLimits.partitionLimit); + size += sizes.sizeof(thriftLimits.cellPerPartitionLimit); + break; + default: + throw new AssertionError(); + } + return size; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java b/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java deleted file mode 100644 index 50ab57d..0000000 --- a/src/java/org/apache/cassandra/db/filter/ExtendedFilter.java +++ /dev/null @@ -1,499 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.filter; - -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - -import com.google.common.base.Objects; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.Operator; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IndexExpression; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.marshal.*; - -/** - * Extends a column filter (IFilter) to include a number of IndexExpression. - */ -public abstract class ExtendedFilter -{ - private static final Logger logger = LoggerFactory.getLogger(ExtendedFilter.class); - - public final ColumnFamilyStore cfs; - public final long timestamp; - public final DataRange dataRange; - private final int maxResults; - private final boolean countCQL3Rows; - private volatile int currentLimit; - - public static ExtendedFilter create(ColumnFamilyStore cfs, - DataRange dataRange, - List<IndexExpression> clause, - int maxResults, - boolean countCQL3Rows, - long timestamp) - { - if (clause == null || clause.isEmpty()) - return new EmptyClauseFilter(cfs, dataRange, maxResults, countCQL3Rows, timestamp); - - return new WithClauses(cfs, dataRange, clause, maxResults, countCQL3Rows, timestamp); - } - - protected ExtendedFilter(ColumnFamilyStore cfs, DataRange dataRange, int maxResults, boolean countCQL3Rows, long timestamp) - { - assert cfs != null; - assert dataRange != null; - this.cfs = cfs; - this.dataRange = dataRange; - this.maxResults = maxResults; - this.timestamp = timestamp; - this.countCQL3Rows = countCQL3Rows; - this.currentLimit = maxResults; - if (countCQL3Rows) - dataRange.updateColumnsLimit(maxResults); - } - - public int maxRows() - { - return countCQL3Rows ? Integer.MAX_VALUE : maxResults; - } - - public int maxColumns() - { - return countCQL3Rows ? maxResults : Integer.MAX_VALUE; - } - - public int currentLimit() - { - return currentLimit; - } - - public IDiskAtomFilter columnFilter(ByteBuffer key) - { - return dataRange.columnFilter(key); - } - - public int lastCounted(ColumnFamily data) - { - return dataRange.getLiveCount(data, timestamp); - } - - public void updateFilter(int currentColumnsCount) - { - if (!countCQL3Rows) - return; - - currentLimit = maxResults - currentColumnsCount; - // We propagate that limit to the underlying filter so each internal query don't - // fetch more than we needs it to. - dataRange.updateColumnsLimit(currentLimit); - } - - public abstract List<IndexExpression> getClause(); - - /** - * Returns a filter to query the columns from the clause that the initial slice filter may not have caught. - * @param data the data retrieve by the initial filter - * @return a filter or null if there can't be any columns we missed with our initial filter (typically if it was a names query, or a slice of the entire row) - */ - public abstract IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data); - - /** - * @return data pruned down to the columns originally asked for - */ - public abstract ColumnFamily prune(DecoratedKey key, ColumnFamily data); - - /** Returns true if tombstoned partitions should not be included in results or count towards the limit, false otherwise. */ - public boolean ignoreTombstonedPartitions() - { - return dataRange.ignoredTombstonedPartitions(); - } - - /** - * @return true if the provided data satisfies all the expressions from - * the clause of this filter. - */ - public abstract boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement); - - public static boolean satisfies(int comparison, Operator op) - { - switch (op) - { - case EQ: - return comparison == 0; - case GTE: - return comparison >= 0; - case GT: - return comparison > 0; - case LTE: - return comparison <= 0; - case LT: - return comparison < 0; - default: - throw new IllegalStateException(); - } - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("dataRange", dataRange) - .add("maxResults", maxResults) - .add("currentLimit", currentLimit) - .add("timestamp", timestamp) - .add("countCQL3Rows", countCQL3Rows) - .toString(); - } - - public static class WithClauses extends ExtendedFilter - { - private final List<IndexExpression> clause; - private final IDiskAtomFilter optimizedFilter; - - public WithClauses(ColumnFamilyStore cfs, - DataRange range, - List<IndexExpression> clause, - int maxResults, - boolean countCQL3Rows, - long timestamp) - { - super(cfs, range, maxResults, countCQL3Rows, timestamp); - assert clause != null; - this.clause = clause; - this.optimizedFilter = computeOptimizedFilter(); - } - - /* - * Potentially optimize the column filter if we have a change to make it catch all clauses - * right away. - */ - private IDiskAtomFilter computeOptimizedFilter() - { - /* - * We shouldn't do the "optimization" for composites as the index names are not valid column names - * (which the rest of the method assumes). Said optimization is not useful for composites anyway. - * We also don't want to do for paging ranges as the actual filter depends on the row key (it would - * probably be possible to make it work but we won't really use it so we don't bother). - */ - if (cfs.getComparator().isCompound() || dataRange instanceof DataRange.Paging) - return null; - - IDiskAtomFilter filter = dataRange.columnFilter(null); // ok since not a paging range - if (filter instanceof SliceQueryFilter) - { - // if we have a high chance of getting all the columns in a single index slice (and it's not too costly), do that. - // otherwise, the extraFilter (lazily created) will fetch by name the columns referenced by the additional expressions. - if (cfs.metric.maxRowSize.getValue() < DatabaseDescriptor.getColumnIndexSize()) - { - logger.trace("Expanding slice filter to entire row to cover additional expressions"); - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, ((SliceQueryFilter)filter).reversed, Integer.MAX_VALUE); - } - } - else - { - logger.trace("adding columns to original Filter to cover additional expressions"); - assert filter instanceof NamesQueryFilter; - if (!clause.isEmpty()) - { - SortedSet<CellName> columns = new TreeSet<CellName>(cfs.getComparator()); - for (IndexExpression expr : clause) - columns.add(cfs.getComparator().cellFromByteBuffer(expr.column)); - columns.addAll(((NamesQueryFilter) filter).columns); - return ((NamesQueryFilter) filter).withUpdatedColumns(columns); - } - } - return null; - } - - @Override - public IDiskAtomFilter columnFilter(ByteBuffer key) - { - return optimizedFilter == null ? dataRange.columnFilter(key) : optimizedFilter; - } - - public List<IndexExpression> getClause() - { - return clause; - } - - /* - * We may need an extra query only if the original query wasn't selecting the row entirely. - * Furthermore, we only need the extra query if we haven't yet got all the expressions from the clause. - */ - private boolean needsExtraQuery(ByteBuffer rowKey, ColumnFamily data) - { - IDiskAtomFilter filter = columnFilter(rowKey); - if (filter instanceof SliceQueryFilter && DataRange.isFullRowSlice((SliceQueryFilter)filter)) - return false; - - for (IndexExpression expr : clause) - { - if (data.getColumn(data.getComparator().cellFromByteBuffer(expr.column)) == null) - { - logger.debug("adding extraFilter to cover additional expressions"); - return true; - } - } - return false; - } - - public IDiskAtomFilter getExtraFilter(DecoratedKey rowKey, ColumnFamily data) - { - /* - * This method assumes the IndexExpression names are valid column names, which is not the - * case with composites. This is ok for now however since: - * 1) CompositeSearcher doesn't use it. - * 2) We don't yet allow non-indexed range slice with filters in CQL3 (i.e. this will never be - * called by CFS.filter() for composites). - */ - assert !(cfs.getComparator().isCompound()) : "Sequential scan with filters is not supported (if you just created an index, you " - + "need to wait for the creation to be propagated to all nodes before querying it)"; - - if (!needsExtraQuery(rowKey.getKey(), data)) - return null; - - // Note: for counters we must be careful to not add a column that was already there (to avoid overcount). That is - // why we do the dance of avoiding to query any column we already have (it's also more efficient anyway) - SortedSet<CellName> columns = new TreeSet<CellName>(cfs.getComparator()); - for (IndexExpression expr : clause) - { - CellName name = data.getComparator().cellFromByteBuffer(expr.column); - if (data.getColumn(name) == null) - columns.add(name); - } - assert !columns.isEmpty(); - return new NamesQueryFilter(columns); - } - - public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data) - { - if (optimizedFilter == null) - return data; - - ColumnFamily pruned = data.cloneMeShallow(); - IDiskAtomFilter filter = dataRange.columnFilter(rowKey.getKey()); - Iterator<Cell> iter = filter.getColumnIterator(data); - try - { - filter.collectReducedColumns(pruned, QueryFilter.gatherTombstones(pruned, iter), rowKey, cfs.gcBefore(timestamp), timestamp); - } - catch (TombstoneOverwhelmingException e) - { - e.setKey(rowKey); - throw e; - } - return pruned; - } - - public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement) - { - for (IndexExpression expression : clause) - { - ColumnDefinition def = data.metadata().getColumnDefinition(expression.column); - ByteBuffer dataValue = null; - AbstractType<?> validator = null; - if (def == null) - { - // This can't happen with CQL3 as this should be rejected upfront. For thrift however, - // cell name are not predefined. But that means the cell name correspond to an internal one. - Cell cell = data.getColumn(data.getComparator().cellFromByteBuffer(expression.column)); - if (cell != null) - { - dataValue = cell.value(); - validator = data.metadata().getDefaultValidator(); - } - } - else - { - if (def.type.isCollection() && def.type.isMultiCell()) - { - if (!collectionSatisfies(def, data, prefix, expression)) - return false; - continue; - } - - dataValue = extractDataValue(def, rowKey.getKey(), data, prefix); - validator = def.type; - } - - if (dataValue == null) - return false; - - if (expression.operator == Operator.CONTAINS) - { - assert def != null && def.type.isCollection() && !def.type.isMultiCell(); - CollectionType type = (CollectionType)def.type; - switch (type.kind) - { - case LIST: - ListType<?> listType = (ListType)def.type; - if (!listType.getSerializer().deserialize(dataValue).contains(listType.getElementsType().getSerializer().deserialize(expression.value))) - return false; - break; - case SET: - SetType<?> setType = (SetType)def.type; - if (!setType.getSerializer().deserialize(dataValue).contains(setType.getElementsType().getSerializer().deserialize(expression.value))) - return false; - break; - case MAP: - MapType<?,?> mapType = (MapType)def.type; - if (!mapType.getSerializer().deserialize(dataValue).containsValue(mapType.getValuesType().getSerializer().deserialize(expression.value))) - return false; - break; - } - } - else if (expression.operator == Operator.CONTAINS_KEY) - { - assert def != null && def.type.isCollection() && !def.type.isMultiCell() && def.type instanceof MapType; - MapType<?,?> mapType = (MapType)def.type; - if (mapType.getSerializer().getSerializedValue(dataValue, expression.value, mapType.getKeysType()) == null) - return false; - } - else - { - int v = validator.compare(dataValue, expression.value); - if (!satisfies(v, expression.operator)) - return false; - } - } - return true; - } - - private static boolean collectionSatisfies(ColumnDefinition def, ColumnFamily data, Composite prefix, IndexExpression expr) - { - assert def.type.isCollection() && def.type.isMultiCell(); - CollectionType type = (CollectionType)def.type; - - if (expr.isContains()) - { - // get a slice of the collection cells - Iterator<Cell> iter = data.iterator(new ColumnSlice[]{ data.getComparator().create(prefix, def).slice() }); - while (iter.hasNext()) - { - Cell cell = iter.next(); - if (type.kind == CollectionType.Kind.SET) - { - if (type.nameComparator().compare(cell.name().collectionElement(), expr.value) == 0) - return true; - } - else - { - if (type.valueComparator().compare(cell.value(), expr.value) == 0) - return true; - } - } - - return false; - } - - assert type.kind == CollectionType.Kind.MAP; - if (expr.isContainsKey()) - return data.getColumn(data.getComparator().create(prefix, def, expr.value)) != null; - - Iterator<Cell> iter = data.iterator(new ColumnSlice[]{ data.getComparator().create(prefix, def).slice() }); - ByteBuffer key = CompositeType.extractComponent(expr.value, 0); - ByteBuffer value = CompositeType.extractComponent(expr.value, 1); - while (iter.hasNext()) - { - Cell next = iter.next(); - if (type.nameComparator().compare(next.name().collectionElement(), key) == 0 && - type.valueComparator().compare(next.value(), value) == 0) - return true; - } - return false; - } - - private ByteBuffer extractDataValue(ColumnDefinition def, ByteBuffer rowKey, ColumnFamily data, Composite prefix) - { - switch (def.kind) - { - case PARTITION_KEY: - return def.isOnAllComponents() - ? rowKey - : ((CompositeType)data.metadata().getKeyValidator()).split(rowKey)[def.position()]; - case CLUSTERING_COLUMN: - return prefix.get(def.position()); - case REGULAR: - CellName cname = prefix == null - ? data.getComparator().cellFromByteBuffer(def.name.bytes) - : data.getComparator().create(prefix, def); - - Cell cell = data.getColumn(cname); - return cell == null ? null : cell.value(); - case COMPACT_VALUE: - assert data.getColumnCount() == 1; - return data.getSortedColumns().iterator().next().value(); - } - throw new AssertionError(); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("dataRange", dataRange) - .add("timestamp", timestamp) - .add("clause", clause) - .toString(); - } - } - - private static class EmptyClauseFilter extends ExtendedFilter - { - public EmptyClauseFilter(ColumnFamilyStore cfs, DataRange range, int maxResults, boolean countCQL3Rows, long timestamp) - { - super(cfs, range, maxResults, countCQL3Rows, timestamp); - } - - public List<IndexExpression> getClause() - { - return Collections.<IndexExpression>emptyList(); - } - - public IDiskAtomFilter getExtraFilter(DecoratedKey key, ColumnFamily data) - { - return null; - } - - public ColumnFamily prune(DecoratedKey rowKey, ColumnFamily data) - { - return data; - } - - public boolean isSatisfiedBy(DecoratedKey rowKey, ColumnFamily data, Composite prefix, ByteBuffer collectionElement) - { - return true; - } - } -}
