Repository: cassandra Updated Branches: refs/heads/trunk 129b68c1c -> fd74a0360
Re-enable skipping non-queried column values patch by slebresne; reviewed by beobal for CASSANDRA-10657 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fd74a036 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fd74a036 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fd74a036 Branch: refs/heads/trunk Commit: fd74a03602421ca07b6b1087803c54577adae4dd Parents: 129b68c Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Dec 22 17:08:17 2015 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Jan 22 15:25:12 2016 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/SinglePartitionReadCommand.java | 2 +- .../cassandra/db/filter/ColumnFilter.java | 217 +++++++++++-------- .../db/partitions/PartitionUpdate.java | 26 ++- .../org/apache/cassandra/db/rows/BTreeRow.java | 34 ++- .../cassandra/db/rows/ComplexColumnData.java | 22 +- src/java/org/apache/cassandra/db/rows/Row.java | 10 + .../apache/cassandra/db/rows/RowIterators.java | 17 ++ .../cassandra/db/rows/SerializationHelper.java | 39 +++- .../db/rows/UnfilteredRowIterators.java | 18 ++ .../cassandra/db/rows/UnfilteredSerializer.java | 4 +- .../cassandra/db/rows/WithOnlyQueriedData.java | 49 +++++ .../apache/cassandra/schema/SchemaKeyspace.java | 2 +- .../apache/cassandra/service/DataResolver.java | 15 +- .../cassandra/streaming/StreamReceiveTask.java | 3 +- .../cassandra/thrift/CassandraServer.java | 4 +- 16 files changed, 338 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a44b967..5c577f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.4 + * Skip values for non-queried columns (CASSANDRA-10657) * Add support for secondary indexes on static columns (CASSANDRA-8103) * CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051) * Add metric for number of dropped mutations (CASSANDRA-10866) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4c87d10..a1de3d6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -746,7 +746,7 @@ public class SinglePartitionReadCommand extends ReadCommand try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) { - final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter)); + final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter())); StageManager.getStage(Stage.MUTATION).execute(new Runnable() { public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index e22c154..9c4c714 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -36,44 +36,57 @@ import org.apache.cassandra.io.util.DataOutputPlus; * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected * by a query. * - * In practice, this class cover 2 main cases: - * 1) most user queries have to internally query all columns, because the CQL semantic requires us to know if - * a row is live or not even if it has no values for the columns requested by the user (see #6588for more - * details). However, while we need to know for columns if it has live values, we can actually save from - * sending the values for those columns that will not be returned to the user. - * 2) for some internal queries (and for queries using #6588 if we introduce it), we're actually fine only - * actually querying some of the columns. + * We distinguish 2 sets of columns in practice: the _fetched_ columns, which are the columns that we (may, see + * below) need to fetch internally, and the _queried_ columns, which are the columns that the user has selected + * in its request. * - * For complex columns, this class allows to be more fine grained than the column by only selection some of the - * cells of the complex column (either individual cell by path name, or some slice). + * The reason for distinguishing those 2 sets is that due to the CQL semantic (see #6588 for more details), we + * often need to internally fetch all columns for the queried table, but can still do some optimizations for those + * columns that are not directly queried by the user (see #10657 for more details). + * + * Note that in practice: + * - the _queried_ columns set is always included in the _fetched_ one. + * - whenever those sets are different, we know the _fetched_ set contains all columns for the table, so we + * don't have to record this set, we just keep a pointer to the table metadata. The only set we concretely + * store is thus the _queried_ one. + * - in the special case of a {@code SELECT *} query, we want to query all columns, and _fetched_ == _queried. + * As this is a common case, we special case it by keeping the _queried_ set {@code null} (and we retrieve + * the columns through the metadata pointer). + * + * For complex columns, this class optionally allows to specify a subset of the cells to query for each column. + * We can either select individual cells by path name, or a slice of them. Note that this is a sub-selection of + * _queried_ cells, so if _fetched_ != _queried_, then the cell selected by this sub-selection are considered + * queried and the other ones are considered fetched (and if a column has some sub-selection, it must be a queried + * column, which is actually enforced by the Builder below). */ public class ColumnFilter { public static final Serializer serializer = new Serializer(); - // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved - // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped. - // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all. + // True if _fetched_ is all the columns, in which case metadata must not be null. If false, + // then _fetched_ == _queried_ and we only store _queried_. private final boolean isFetchAll; private final CFMetaData metadata; // can be null if !isFetchAll - private final PartitionColumns selection; // can be null if isFetchAll and we don't want to skip any value + private final PartitionColumns queried; // can be null if isFetchAll and _fetched_ == _queried_ private final SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections; // can be null private ColumnFilter(boolean isFetchAll, CFMetaData metadata, - PartitionColumns columns, + PartitionColumns queried, SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections) { + assert !isFetchAll || metadata != null; + assert isFetchAll || queried != null; this.isFetchAll = isFetchAll; this.metadata = metadata; - this.selection = columns; + this.queried = queried; this.subSelections = subSelections; } /** - * A selection that includes all columns (and their values). + * A filter that includes all columns for the provided table. */ public static ColumnFilter all(CFMetaData metadata) { @@ -81,7 +94,7 @@ public class ColumnFilter } /** - * A selection that only fetch the provided columns. + * A filter that only fetches/queries the provided columns. * <p> * Note that this shouldn't be used for CQL queries in general as all columns should be queried to * preserve CQL semantic (see class javadoc). This is ok for some internal queries however (and @@ -93,81 +106,93 @@ public class ColumnFilter } /** - * The columns that needs to be fetched internally for this selection. - * <p> - * This is the columns that must be present in the internal rows returned by queries using this selection, - * not the columns that are actually queried by the user (see the class javadoc for details). + * The columns that needs to be fetched internally for this filter. * - * @return the column to fetch for this selection. + * @return the columns to fetch for this filter. */ public PartitionColumns fetchedColumns() { - return isFetchAll ? metadata.partitionColumns() : selection; + return isFetchAll ? metadata.partitionColumns() : queried; + } + + /** + * The columns actually queried by the user. + * <p> + * Note that this is in general not all the columns that are fetched internally (see {@link #fetchedColumns}). + */ + public PartitionColumns queriedColumns() + { + assert queried != null || isFetchAll; + return queried == null ? metadata.partitionColumns() : queried; } - public boolean includesAllColumns() + public boolean fetchesAllColumns() { return isFetchAll; } /** - * Whether the provided column is selected by this selection. + * Whether _fetched_ == _queried_ for this filter, and so if the {@code isQueried()} methods + * can return {@code false} for some column/cell. */ - public boolean includes(ColumnDefinition column) + public boolean allFetchedColumnsAreQueried() { - return isFetchAll || selection.contains(column); + return !isFetchAll || (queried == null && subSelections == null); } /** - * Whether we can skip the value for the provided selected column. + * Whether the provided column is fetched by this filter. */ - public boolean canSkipValue(ColumnDefinition column) + public boolean fetches(ColumnDefinition column) { - // We don't use that currently, see #10655 for more details. - return false; + return isFetchAll || queried.contains(column); } /** - * Whether the provided cell of a complex column is selected by this selection. + * Whether the provided column, which is assumed to be _fetched_ by this filter (so the caller must guarantee + * that {@code fetches(column) == true}, is also _queried_ by the user. + * + * !WARNING! please be sure to understand the difference between _fetched_ and _queried_ + * columns that this class made before using this method. If unsure, you probably want + * to use the {@link #fetches} method. */ - public boolean includes(Cell cell) + public boolean fetchedColumnIsQueried(ColumnDefinition column) { - if (isFetchAll || subSelections == null || !cell.column().isComplex()) - return true; - - SortedSet<ColumnSubselection> s = subSelections.get(cell.column().name); - if (s.isEmpty()) - return true; - - for (ColumnSubselection subSel : s) - if (subSel.compareInclusionOf(cell.path()) == 0) - return true; - - return false; + return !isFetchAll || queried == null || queried.contains(column); } /** - * Whether we can skip the value of the cell of a complex column. + * Whether the provided complex cell (identified by its column and path), which is assumed to be _fetched_ by + * this filter, is also _queried_ by the user. + * + * !WARNING! please be sure to understand the difference between _fetched_ and _queried_ + * columns that this class made before using this method. If unsure, you probably want + * to use the {@link #fetches} method. */ - public boolean canSkipValue(ColumnDefinition column, CellPath path) + public boolean fetchedCellIsQueried(ColumnDefinition column, CellPath path) { - if (!isFetchAll || subSelections == null || !column.isComplex()) - return false; + assert path != null; + if (!isFetchAll || subSelections == null) + return true; SortedSet<ColumnSubselection> s = subSelections.get(column.name); + // No subsection for this column means everything is queried if (s.isEmpty()) - return false; + return true; for (ColumnSubselection subSel : s) if (subSel.compareInclusionOf(path) == 0) - return false; + return true; - return true; + return false; } /** * Creates a new {@code Tester} to efficiently test the inclusion of cells of complex column * {@code column}. + * + * @return the created tester or {@code null} if all the cells from the provided column + * are queried. */ public Tester newTester(ColumnDefinition column) { @@ -182,8 +207,8 @@ public class ColumnFilter } /** - * Returns a {@code ColumnFilter}} builder that includes all columns (so the selections - * added to the builder are the columns/cells for which we shouldn't skip the values). + * Returns a {@code ColumnFilter}} builder that fetches all columns (and queries the columns + * added to the builder, or everything if no column is added). */ public static Builder allColumnsBuilder(CFMetaData metadata) { @@ -191,8 +216,7 @@ public class ColumnFilter } /** - * Returns a {@code ColumnFilter}} builder that includes only the columns/cells - * added to the builder. + * Returns a {@code ColumnFilter} builder that only fetches the columns/cells added to the builder. */ public static Builder selectionBuilder() { @@ -211,17 +235,20 @@ public class ColumnFilter this.iterator = iterator; } - public boolean includes(CellPath path) + public boolean fetches(CellPath path) { - return isFetchAll || includedBySubselection(path); + return isFetchAll || hasSubselection(path); } - public boolean canSkipValue(CellPath path) + /** + * Must only be called if {@code fetches(path) == true}. + */ + public boolean fetchedCellIsQueried(CellPath path) { - return isFetchAll && !includedBySubselection(path); + return !isFetchAll || hasSubselection(path); } - private boolean includedBySubselection(CellPath path) + private boolean hasSubselection(CellPath path) { while (current != null || iterator.hasNext()) { @@ -241,10 +268,22 @@ public class ColumnFilter } } + /** + * A builder for a {@code ColumnFilter} object. + * + * Note that the columns added to this build are the _queried_ column. Whether or not all columns + * are _fetched_ depends on which ctor you've used to obtained this builder, allColumnsBuilder (all + * columns are fetched) or selectionBuilder (only the queried columns are fetched). + * + * Note that for a allColumnsBuilder, if no queried columns are added, this is interpreted as querying + * all columns, not querying none (but if you know you want to query all columns, prefer + * {@link ColumnFilter#all)}. For selectionBuilder, adding no queried columns means no column will be + * fetched (so the builder will return {@code PartitionColumns.NONE}). + */ public static class Builder { - private final CFMetaData metadata; - private PartitionColumns.Builder selection; + private final CFMetaData metadata; // null if we don't fetch all columns + private PartitionColumns.Builder queriedBuilder; private List<ColumnSubselection> subSelections; private Builder(CFMetaData metadata) @@ -254,17 +293,17 @@ public class ColumnFilter public Builder add(ColumnDefinition c) { - if (selection == null) - selection = PartitionColumns.builder(); - selection.add(c); + if (queriedBuilder == null) + queriedBuilder = PartitionColumns.builder(); + queriedBuilder.add(c); return this; } public Builder addAll(Iterable<ColumnDefinition> columns) { - if (selection == null) - selection = PartitionColumns.builder(); - selection.addAll(columns); + if (queriedBuilder == null) + queriedBuilder = PartitionColumns.builder(); + queriedBuilder.addAll(columns); return this; } @@ -291,11 +330,11 @@ public class ColumnFilter { boolean isFetchAll = metadata != null; - PartitionColumns selectedColumns = selection == null ? null : selection.build(); - // It's only ok to have selection == null in ColumnFilter if isFetchAll. So deal with the case of a "selection" builder + PartitionColumns queried = queriedBuilder == null ? null : queriedBuilder.build(); + // It's only ok to have queried == null in ColumnFilter if isFetchAll. So deal with the case of a selectionBuilder // with nothing selected (we can at least happen on some backward compatible queries - CASSANDRA-10471). - if (!isFetchAll && selectedColumns == null) - selectedColumns = PartitionColumns.NONE; + if (!isFetchAll && queried == null) + queried = PartitionColumns.NONE; SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null; if (subSelections != null) @@ -305,7 +344,7 @@ public class ColumnFilter s.put(subSelection.column().name, subSelection); } - return new ColumnFilter(isFetchAll, metadata, selectedColumns, s); + return new ColumnFilter(isFetchAll, metadata, queried, s); } } @@ -315,10 +354,10 @@ public class ColumnFilter if (isFetchAll) return "*"; - if (selection.isEmpty()) + if (queried.isEmpty()) return ""; - Iterator<ColumnDefinition> defs = selection.selectOrderIterator(); + Iterator<ColumnDefinition> defs = queried.selectOrderIterator(); if (!defs.hasNext()) return "<none>"; @@ -355,13 +394,13 @@ public class ColumnFilter public static class Serializer { private static final int IS_FETCH_ALL_MASK = 0x01; - private static final int HAS_SELECTION_MASK = 0x02; + private static final int HAS_QUERIED_MASK = 0x02; private static final int HAS_SUB_SELECTIONS_MASK = 0x04; private static int makeHeaderByte(ColumnFilter selection) { return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) - | (selection.selection != null ? HAS_SELECTION_MASK : 0) + | (selection.queried != null ? HAS_QUERIED_MASK : 0) | (selection.subSelections != null ? HAS_SUB_SELECTIONS_MASK : 0); } @@ -369,10 +408,10 @@ public class ColumnFilter { out.writeByte(makeHeaderByte(selection)); - if (selection.selection != null) + if (selection.queried != null) { - Columns.serializer.serialize(selection.selection.statics, out); - Columns.serializer.serialize(selection.selection.regulars, out); + Columns.serializer.serialize(selection.queried.statics, out); + Columns.serializer.serialize(selection.queried.regulars, out); } if (selection.subSelections != null) @@ -387,15 +426,15 @@ public class ColumnFilter { int header = in.readUnsignedByte(); boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0; - boolean hasSelection = (header & HAS_SELECTION_MASK) != 0; + boolean hasQueried = (header & HAS_QUERIED_MASK) != 0; boolean hasSubSelections = (header & HAS_SUB_SELECTIONS_MASK) != 0; - PartitionColumns selection = null; - if (hasSelection) + PartitionColumns queried = null; + if (hasQueried) { Columns statics = Columns.serializer.deserialize(in, metadata); Columns regulars = Columns.serializer.deserialize(in, metadata); - selection = new PartitionColumns(statics, regulars); + queried = new PartitionColumns(statics, regulars); } SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null; @@ -410,17 +449,17 @@ public class ColumnFilter } } - return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, selection, subSelections); + return new ColumnFilter(isFetchAll, isFetchAll ? metadata : null, queried, subSelections); } public long serializedSize(ColumnFilter selection, int version) { long size = 1; // header byte - if (selection.selection != null) + if (selection.queried != null) { - size += Columns.serializer.serializedSize(selection.selection.statics); - size += Columns.serializer.serializedSize(selection.selection.regulars); + size += Columns.serializer.serializedSize(selection.queried.statics); + size += Columns.serializer.serializedSize(selection.queried.regulars); } if (selection.subSelections != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index f10b3b6..02369e4 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -193,18 +193,36 @@ public class PartitionUpdate extends AbstractBTreePartition /** * Turns the given iterator into an update. * + * @param iterator the iterator to turn into updates. + * @param filter the column filter used when querying {@code iterator}. This is used to make + * sure we don't include data for which the value has been skipped while reading (as we would + * then be writing something incorrect). + * * Warning: this method does not close the provided iterator, it is up to * the caller to close it. */ - public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator) + public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter) { + iterator = UnfilteredRowIterators.withOnlyQueriedData(iterator, filter); Holder holder = build(iterator, 16); MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo; return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false); } - public static PartitionUpdate fromIterator(RowIterator iterator) + /** + * Turns the given iterator into an update. + * + * @param iterator the iterator to turn into updates. + * @param filter the column filter used when querying {@code iterator}. This is used to make + * sure we don't include data for which the value has been skipped while reading (as we would + * then be writing something incorrect). + * + * Warning: this method does not close the provided iterator, it is up to + * the caller to close it. + */ + public static PartitionUpdate fromIterator(RowIterator iterator, ColumnFilter filter) { + iterator = RowIterators.withOnlyQueriedData(iterator, filter); MutableDeletionInfo deletionInfo = MutableDeletionInfo.live(); Holder holder = build(iterator, deletionInfo, true, 16); return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false); @@ -296,7 +314,7 @@ public class PartitionUpdate extends AbstractBTreePartition int nowInSecs = FBUtilities.nowInSeconds(); List<UnfilteredRowIterator> asIterators = Lists.transform(updates, AbstractBTreePartition::unfilteredIterator); - return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs)); + return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs), ColumnFilter.all(updates.get(0).metadata())); } /** @@ -690,7 +708,7 @@ public class PartitionUpdate extends AbstractBTreePartition try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key)) { assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families - return PartitionUpdate.fromIterator(iterator); + return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index e8667e0..a0912ae 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -237,10 +237,12 @@ public class BTreeRow extends AbstractRow { Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); - if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty()) + boolean mayFilterColumns = !filter.fetchesAllColumns() || !filter.allFetchedColumnsAreQueried(); + boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time()); + + if (!mayFilterColumns && !mayHaveShadowed && droppedColumns.isEmpty()) return this; - boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time()); LivenessInfo newInfo = primaryKeyLivenessInfo; Deletion newDeletion = deletion; @@ -255,6 +257,8 @@ public class BTreeRow extends AbstractRow Columns columns = filter.fetchedColumns().columns(isStatic()); Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester(); + Predicate<ColumnDefinition> queriedByUserTester = filter.queriedColumns().columns(isStatic()).inOrderInclusionTester(); + final LivenessInfo rowLiveness = newInfo; return transformAndFilter(newInfo, newDeletion, (cd) -> { ColumnDefinition column = cd.column(); @@ -263,11 +267,31 @@ public class BTreeRow extends AbstractRow CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); if (column.isComplex()) - return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped); + return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped, rowLiveness); Cell cell = (Cell) cd; - return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)) - ? cell : null; + // We include the cell unless it is 1) shadowed, 2) for a dropped column or 3) skippable. + // And a cell is skippable if it is for a column that is not queried by the user and its timestamp + // is lower than the row timestamp (see #10657 or SerializationHelper.includes() for details). + boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime; + boolean isShadowed = mayHaveShadowed && activeDeletion.deletes(cell); + boolean isSkippable = !queriedByUserTester.test(column) && cell.timestamp() < rowLiveness.timestamp(); + return isForDropped || isShadowed || isSkippable ? null : cell; + }); + } + + public Row withOnlyQueriedData(ColumnFilter filter) + { + if (filter.allFetchedColumnsAreQueried()) + return this; + + return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> { + + ColumnDefinition column = cd.column(); + if (column.isComplex()) + return ((ComplexColumnData)cd).withOnlyQueriedData(filter); + + return filter.fetchedColumnIsQueried(column) ? cd : null; }); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index fab529b..ac137e7 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -29,6 +29,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.DeletionTime; import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.ByteType; import org.apache.cassandra.db.marshal.SetType; @@ -144,19 +145,21 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared); } - public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped) + public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped, LivenessInfo rowLiveness) { ColumnFilter.Tester cellTester = filter.newTester(column); if (cellTester == null && activeDeletion.isLive() && dropped == null) return this; DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion; - return transformAndFilter(newDeletion, - (cell) -> - (cellTester == null || cellTester.includes(cell.path())) - && !activeDeletion.deletes(cell) - && (dropped == null || cell.timestamp() > dropped.droppedTime) - ? cell : null); + return transformAndFilter(newDeletion, (cell) -> + { + boolean isForDropped = dropped != null && cell.timestamp() <= dropped.droppedTime; + boolean isShadowed = activeDeletion.deletes(cell); + boolean isSkippable = cellTester != null && (!cellTester.fetches(cell.path()) + || (!cellTester.fetchedCellIsQueried(cell.path()) && cell.timestamp() < rowLiveness.timestamp())); + return isForDropped || isShadowed || isSkippable ? null : cell; + }); } public ComplexColumnData purge(DeletionPurger purger, int nowInSec) @@ -165,6 +168,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, nowInSec)); } + public ComplexColumnData withOnlyQueriedData(ColumnFilter filter) + { + return transformAndFilter(complexDeletion, (cell) -> filter.fetchedCellIsQueried(column, cell.path()) ? null : cell); + } + private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function) { Object[] transformed = BTree.transformAndFilter(cells, function); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 8a67e9b..173631e 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -205,6 +205,16 @@ public interface Row extends Unfiltered, Collection<ColumnData> public Row purge(DeletionPurger purger, int nowInSec); /** + * Returns a copy of this row which only include the data queried by {@code filter}, excluding anything _fetched_ for + * internal reasons but not queried by the user (see {@link ColumnFilter} for details). + * + * @param filter the {@code ColumnFilter} to use when deciding what is user queried. This should be the filter + * that was used when querying the row on which this method is called. + * @return the row but with all data that wasn't queried by the user skipped. + */ + public Row withOnlyQueriedData(ColumnFilter filter); + + /** * Returns a copy of this row where all counter cells have they "local" shard marked for clearing. */ public Row markCounterLocalToBeCleared(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index 551edb8..ca248b6 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.FBUtilities; @@ -49,6 +50,22 @@ public abstract class RowIterators } /** + * Filter the provided iterator to only include cells that are selected by the user. + * + * @param iterator the iterator to filter. + * @param filter the {@code ColumnFilter} to use when deciding which cells are queried by the user. This should be the filter + * that was used when querying {@code iterator}. + * @return the filtered iterator.. + */ + public static RowIterator withOnlyQueriedData(RowIterator iterator, ColumnFilter filter) + { + if (filter.allFetchedColumnsAreQueried()) + return iterator; + + return Transformation.apply(iterator, new WithOnlyQueriedData(filter)); + } + + /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> * Note that this is only meant for debugging as this can log a very large amount of http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/SerializationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java index 6b4bc2e..e40a1e1 100644 --- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java +++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java @@ -67,34 +67,49 @@ public class SerializationHelper this(metadata, version, flag, null); } - public Columns fetchedStaticColumns(SerializationHeader header) - { - return columnsToFetch == null ? header.columns().statics : columnsToFetch.fetchedColumns().statics; - } - - public Columns fetchedRegularColumns(SerializationHeader header) + public boolean includes(ColumnDefinition column) { - return columnsToFetch == null ? header.columns().regulars : columnsToFetch.fetchedColumns().regulars; + return columnsToFetch == null || columnsToFetch.fetches(column); } - public boolean includes(ColumnDefinition column) + public boolean includes(Cell cell, LivenessInfo rowLiveness) { - return columnsToFetch == null || columnsToFetch.includes(column); + if (columnsToFetch == null) + return true; + + // During queries, some columns are included even though they are not queried by the user because + // we always need to distinguish between having a row (with potentially only null values) and not + // having a row at all (see #CASSANDRA-7085 for background). In the case where the column is not + // actually requested by the user however (canSkipValue), we can skip the full cell if the cell + // timestamp is lower than the row one, because in that case, the row timestamp is enough proof + // of the liveness of the row. Otherwise, we'll only be able to skip the values of those cells. + ColumnDefinition column = cell.column(); + if (column.isComplex()) + { + if (!includes(cell.path())) + return false; + + return !canSkipValue(cell.path()) || cell.timestamp() >= rowLiveness.timestamp(); + } + else + { + return columnsToFetch.fetchedColumnIsQueried(column) || cell.timestamp() >= rowLiveness.timestamp(); + } } public boolean includes(CellPath path) { - return path == null || tester == null || tester.includes(path); + return path == null || tester == null || tester.fetches(path); } public boolean canSkipValue(ColumnDefinition column) { - return columnsToFetch != null && columnsToFetch.canSkipValue(column); + return columnsToFetch != null && !columnsToFetch.fetchedColumnIsQueried(column); } public boolean canSkipValue(CellPath path) { - return path != null && tester != null && tester.canSkipValue(path); + return path != null && tester != null && !tester.fetchedCellIsQueried(path); } public void startOfComplexColumn(ColumnDefinition column) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index ea929d7..9aa7cc4 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.FilteredRows; import org.apache.cassandra.db.transform.MoreRows; import org.apache.cassandra.db.transform.Transformation; @@ -128,6 +129,23 @@ public abstract class UnfilteredRowIterators } /** + * Filter the provided iterator to exclude cells that have been fetched but are not queried by the user + * (see ColumnFilter for detailes). + * + * @param iterator the iterator to filter. + * @param filter the {@code ColumnFilter} to use when deciding which columns are the one queried by the + * user. This should be the filter that was used when querying {@code iterator}. + * @return the filtered iterator.. + */ + public static UnfilteredRowIterator withOnlyQueriedData(UnfilteredRowIterator iterator, ColumnFilter filter) + { + if (filter.allFetchedColumnsAreQueried()) + return iterator; + + return Transformation.apply(iterator, new WithOnlyQueriedData(filter)); + } + + /** * Returns an iterator that concatenate two atom iterators. * This method assumes that both iterator are from the same partition and that the atom from * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 0451161..ae9d6e9 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -453,7 +453,7 @@ public class UnfilteredSerializer if (helper.includes(column)) { Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper); - if (!helper.isDropped(cell, false)) + if (helper.includes(cell, rowLiveness) && !helper.isDropped(cell, false)) builder.addCell(cell); } else @@ -479,7 +479,7 @@ public class UnfilteredSerializer while (--count >= 0) { Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper); - if (helper.includes(cell.path()) && !helper.isDropped(cell, true)) + if (helper.includes(cell, rowLiveness) && !helper.isDropped(cell, true)) builder.addCell(cell); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java new file mode 100644 index 0000000..3930f91 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.transform.Transformation; + +/** + * Function to skip cells (from an iterator) that are not part of those queried by the user + * according to the provided {@code ColumnFilter}. See {@link UnfilteredRowIterators#withoutSkippedValues} + * for more details. + */ +public class WithOnlyQueriedData<I extends BaseRowIterator<?>> extends Transformation<I> +{ + private final ColumnFilter filter; + + public WithOnlyQueriedData(ColumnFilter filter) + { + this.filter = filter; + } + + @Override + protected Row applyToStatic(Row row) + { + return row.withOnlyQueriedData(filter); + } + + @Override + protected Row applyToRow(Row row) + { + return row.withOnlyQueriedData(filter); + } +}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index a28423d..d0b1256 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -364,7 +364,7 @@ public final class SchemaKeyspace mutationMap.put(key, mutation); } - mutation.add(PartitionUpdate.fromIterator(partition)); + mutation.add(PartitionUpdate.fromIterator(partition, cmd.columnFilter())); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index f3858d7..cc2786f 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -28,6 +28,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; @@ -197,10 +198,22 @@ public class DataResolver extends ResponseResolver public void onCell(int i, Clustering clustering, Cell merged, Cell original) { - if (merged != null && !merged.equals(original)) + if (merged != null && !merged.equals(original) && isQueried(merged)) currentRow(i, clustering).addCell(merged); } + private boolean isQueried(Cell cell) + { + // When we read, we may have some cell that have been fetched but are not selected by the user. Those cells may + // have empty values as optimization (see CASSANDRA-10655) and hence they should not be included in the read-repair. + // This is fine since those columns are not actually requested by the user and are only present for the sake of CQL + // semantic (making sure we can always distinguish between a row that doesn't exist from one that do exist but has + /// no value for the column requested by the user) and so it won't be unexpected by the user that those columns are + // not repaired. + ColumnDefinition column = cell.column(); + ColumnFilter filter = command.columnFilter(); + return column.isComplex() ? filter.fetchedCellIsQueried(column, cell.path()) : filter.fetchedColumnIsQueried(column); + } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 6280f3a..081030d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -38,6 +38,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -166,7 +167,7 @@ public class StreamReceiveTask extends StreamTask try (UnfilteredRowIterator rowIterator = scanner.next()) { //Apply unsafe (we will flush below before transaction is done) - new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fd74a036/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index ee86f9d..61d9b5f 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -941,7 +941,7 @@ public class CassandraServer implements Cassandra.Iface DecoratedKey dk = metadata.decorateKey(key); int nowInSec = FBUtilities.nowInSeconds(); - PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec)); + PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec), ColumnFilter.all(metadata)); // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates); @@ -1143,7 +1143,7 @@ public class CassandraServer implements Cassandra.Iface sortAndMerge(metadata, cells, nowInSec); DecoratedKey dk = metadata.decorateKey(key); - PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator())); + PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()), ColumnFilter.all(metadata)); // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);