http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java index 54feb85..33a0917 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import org.apache.cassandra.config.CFMetaData; @@ -26,6 +25,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -146,7 +146,7 @@ public interface ClusteringIndexFilter public interface Serializer { public void serialize(ClusteringIndexFilter filter, DataOutputPlus out, int version) throws IOException; - public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException; + public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException; public long serializedSize(ClusteringIndexFilter filter, int version); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java index f2cc46f..13329f3 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.util.*; @@ -27,6 +26,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTreeSet; @@ -94,6 +94,9 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter public boolean isFullyCoveredBy(CachedPartition partition) { + if (partition.isEmpty()) + return false; + // 'partition' contains all columns, so it covers our filter if our last clusterings // is smaller than the last in the cache return clusterings.comparator().compare(clusterings.last(), partition.lastRow().clustering()) <= 0; @@ -109,18 +112,18 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter { // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when // the range extend) and it's harmless to left them. - return new FilteringRowIterator(iterator) + return new AlteringUnfilteredRowIterator(iterator) { @Override - public FilteringRow makeRowFilter() + public Row computeNextStatic(Row row) { - return FilteringRow.columnsFilteringRow(columnFilter); + return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata()); } @Override - protected boolean includeRow(Row row) + public Row computeNext(Row row) { - return clusterings.contains(row.clustering()); + return clusterings.contains(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null; } }; } @@ -214,7 +217,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter sb.append(i++ == 0 ? "" : ", ").append(clustering.toString(metadata)); if (reversed) sb.append(", reversed"); - return sb.append(")").toString(); + return sb.append(')').toString(); } public String toCQLString(CFMetaData metadata) @@ -223,7 +226,7 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter return ""; StringBuilder sb = new StringBuilder(); - sb.append("(").append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(")"); + sb.append('(').append(ColumnDefinition.toCQLString(metadata.clusteringColumns())).append(')'); sb.append(clusterings.size() == 1 ? " = " : " IN ("); int i = 0; for (Clustering clustering : clusterings) @@ -258,13 +261,13 @@ public class ClusteringIndexNamesFilter extends AbstractClusteringIndexFilter private static class NamesDeserializer extends InternalDeserializer { - public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException + public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException { ClusteringComparator comparator = metadata.comparator; BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(comparator); int size = in.readInt(); for (int i = 0; i < size; i++) - clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes()).takeAlias()); + clusterings.add(Clustering.serializer.deserialize(in, version, comparator.subtypes())); return new ClusteringIndexNamesFilter(clusterings.build(), reversed); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java index 8fb319e..4f0e4e2 100644 --- a/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ClusteringIndexSliceFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.util.List; import java.nio.ByteBuffer; @@ -28,6 +27,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; /** @@ -91,24 +91,24 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter // Note that we don't filter markers because that's a bit trickier (we don't know in advance until when // the range extend) and it's harmless to leave them. - return new FilteringRowIterator(iterator) + return new AlteringUnfilteredRowIterator(iterator) { @Override - public FilteringRow makeRowFilter() + public boolean hasNext() { - return FilteringRow.columnsFilteringRow(columnFilter); + return !tester.isDone() && super.hasNext(); } @Override - protected boolean includeRow(Row row) + public Row computeNextStatic(Row row) { - return tester.includes(row.clustering()); + return columnFilter.fetchedColumns().statics.isEmpty() ? null : row.filter(columnFilter, iterator.metadata()); } @Override - public boolean hasNext() + public Row computeNext(Row row) { - return !tester.isDone() && super.hasNext(); + return tester.includes(row.clustering()) ? row.filter(columnFilter, iterator.metadata()) : null; } }; } @@ -170,7 +170,7 @@ public class ClusteringIndexSliceFilter extends AbstractClusteringIndexFilter private static class SliceDeserializer extends InternalDeserializer { - public ClusteringIndexFilter deserialize(DataInput in, int version, CFMetaData metadata, boolean reversed) throws IOException + public ClusteringIndexFilter deserialize(DataInputPlus in, int version, CFMetaData metadata, boolean reversed) throws IOException { Slices slices = Slices.serializer.deserialize(in, version, metadata); return new ClusteringIndexSliceFilter(slices, reversed); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index b98108d..084bad6 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.util.*; @@ -30,8 +29,8 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; /** * Represents which (non-PK) columns (and optionally which sub-part of a column for complex columns) are selected @@ -52,15 +51,6 @@ public class ColumnFilter { public static final Serializer serializer = new Serializer(); - private static final Comparator<ColumnSubselection> valueComparator = new Comparator<ColumnSubselection>() - { - public int compare(ColumnSubselection s1, ColumnSubselection s2) - { - assert s1.column().name.equals(s2.column().name); - return s1.column().cellPathComparator().compare(s1.minIncludedPath(), s2.minIncludedPath()); - } - }; - // Distinguish between the 2 cases described above: if 'isFetchAll' is true, then all columns will be retrieved // by the query, but the values for column/cells not selected by 'selection' and 'subSelections' will be skipped. // Otherwise, only the column/cells returned by 'selection' and 'subSelections' will be returned at all. @@ -115,6 +105,11 @@ public class ColumnFilter return isFetchAll ? metadata.partitionColumns() : selection; } + public boolean includesAllColumns() + { + return isFetchAll; + } + /** * Whether the provided column is selected by this selection. */ @@ -144,7 +139,7 @@ public class ColumnFilter return true; for (ColumnSubselection subSel : s) - if (subSel.includes(cell.path())) + if (subSel.compareInclusionOf(cell.path()) == 0) return true; return false; @@ -163,7 +158,7 @@ public class ColumnFilter return false; for (ColumnSubselection subSel : s) - if (subSel.includes(path)) + if (subSel.compareInclusionOf(path) == 0) return false; return true; @@ -182,7 +177,7 @@ public class ColumnFilter if (s.isEmpty()) return null; - return new Tester(s.iterator()); + return new Tester(isFetchAll, s.iterator()); } /** @@ -205,46 +200,43 @@ public class ColumnFilter public static class Tester { + private final boolean isFetchAll; private ColumnSubselection current; private final Iterator<ColumnSubselection> iterator; - private Tester(Iterator<ColumnSubselection> iterator) + private Tester(boolean isFetchAll, Iterator<ColumnSubselection> iterator) { + this.isFetchAll = isFetchAll; this.iterator = iterator; } public boolean includes(CellPath path) { - while (current == null) - { - if (!iterator.hasNext()) - return false; - - current = iterator.next(); - if (current.includes(path)) - return true; - - if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0) - current = null; - } - return false; + return isFetchAll || includedBySubselection(path); } public boolean canSkipValue(CellPath path) { - while (current == null) + return isFetchAll && !includedBySubselection(path); + } + + private boolean includedBySubselection(CellPath path) + { + while (current != null || iterator.hasNext()) { - if (!iterator.hasNext()) - return false; + if (current == null) + current = iterator.next(); - current = iterator.next(); - if (current.includes(path)) + int cmp = current.compareInclusionOf(path); + if (cmp == 0) // The path is included + return true; + else if (cmp < 0) // The path is before this sub-selection, it's not included by any return false; - if (current.column().cellPathComparator().compare(current.maxIncludedPath(), path) < 0) - current = null; + // the path is after this sub-selection, we need to check the next one. + current = null; } - return true; + return false; } } @@ -302,7 +294,7 @@ public class ColumnFilter SortedSetMultimap<ColumnIdentifier, ColumnSubselection> s = null; if (subSelections != null) { - s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator); + s = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder()); for (ColumnSubselection subSelection : subSelections) s.put(subSelection.column().name, subSelection); } @@ -317,6 +309,9 @@ public class ColumnFilter if (selection == null) return "*"; + if (selection.isEmpty()) + return ""; + Iterator<ColumnDefinition> defs = selection.selectOrderIterator(); StringBuilder sb = new StringBuilder(); appendColumnDef(sb, defs.next()); @@ -351,7 +346,7 @@ public class ColumnFilter private static final int HAS_SELECTION_MASK = 0x02; private static final int HAS_SUB_SELECTIONS_MASK = 0x04; - private int makeHeaderByte(ColumnFilter selection) + private static int makeHeaderByte(ColumnFilter selection) { return (selection.isFetchAll ? IS_FETCH_ALL_MASK : 0) | (selection.selection != null ? HAS_SELECTION_MASK : 0) @@ -376,7 +371,7 @@ public class ColumnFilter } } - public ColumnFilter deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public ColumnFilter deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { int header = in.readUnsignedByte(); boolean isFetchAll = (header & IS_FETCH_ALL_MASK) != 0; @@ -394,7 +389,7 @@ public class ColumnFilter SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null; if (hasSubSelections) { - subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), valueComparator); + subSelections = TreeMultimap.create(Comparator.<ColumnIdentifier>naturalOrder(), Comparator.<ColumnSubselection>naturalOrder()); int size = in.readUnsignedShort(); for (int i = 0; i < size; i++) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java index 652e27c..e45dbee 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnSubselection.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.filter; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Comparator; @@ -29,6 +28,7 @@ import org.apache.cassandra.db.rows.CellPath; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; @@ -38,7 +38,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; * This only make sense for complex column. For those, this allow for instance * to select only a slice of a map. */ -public abstract class ColumnSubselection +public abstract class ColumnSubselection implements Comparable<ColumnSubselection> { public static final Serializer serializer = new Serializer(); @@ -72,9 +72,19 @@ public abstract class ColumnSubselection protected abstract Kind kind(); - public abstract CellPath minIncludedPath(); - public abstract CellPath maxIncludedPath(); - public abstract boolean includes(CellPath path); + protected abstract CellPath comparisonPath(); + + public int compareTo(ColumnSubselection other) + { + assert other.column().name.equals(column().name); + return column().cellPathComparator().compare(comparisonPath(), other.comparisonPath()); + } + + /** + * Given a path, return -1 if the path is before anything selected by this subselection, 0 if it is selected by this + * subselection and 1 if the path is after anything selected by this subselection. + */ + public abstract int compareInclusionOf(CellPath path); private static class Slice extends ColumnSubselection { @@ -93,20 +103,20 @@ public abstract class ColumnSubselection return Kind.SLICE; } - public CellPath minIncludedPath() + public CellPath comparisonPath() { return from; } - public CellPath maxIncludedPath() - { - return to; - } - - public boolean includes(CellPath path) + public int compareInclusionOf(CellPath path) { Comparator<CellPath> cmp = column.cellPathComparator(); - return cmp.compare(from, path) <= 0 && cmp.compare(path, to) <= 0; + if (cmp.compare(path, from) < 0) + return -1; + else if (cmp.compare(to, path) < 0) + return 1; + else + return 0; } @Override @@ -133,20 +143,14 @@ public abstract class ColumnSubselection return Kind.ELEMENT; } - public CellPath minIncludedPath() - { - return element; - } - - public CellPath maxIncludedPath() + public CellPath comparisonPath() { return element; } - public boolean includes(CellPath path) + public int compareInclusionOf(CellPath path) { - Comparator<CellPath> cmp = column.cellPathComparator(); - return cmp.compare(element, path) == 0; + return column.cellPathComparator().compare(path, element); } @Override @@ -180,7 +184,7 @@ public abstract class ColumnSubselection throw new AssertionError(); } - public ColumnSubselection deserialize(DataInput in, int version, CFMetaData metadata) throws IOException + public ColumnSubselection deserialize(DataInputPlus in, int version, CFMetaData metadata) throws IOException { ByteBuffer name = ByteBufferUtil.readWithShortLength(in); ColumnDefinition column = metadata.getColumnDefinition(name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/DataLimits.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/DataLimits.java b/src/java/org/apache/cassandra/db/filter/DataLimits.java index 76e29ec..206afa4 100644 --- a/src/java/org/apache/cassandra/db/filter/DataLimits.java +++ b/src/java/org/apache/cassandra/db/filter/DataLimits.java @@ -115,8 +115,7 @@ public abstract class DataLimits * The max number of results this limits enforces. * <p> * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for - * thrift, it means cells. The {@link #countsCells} allows to distinguish between the two cases if - * needed. + * thrift, it means cells. * * @return the maximum number of results this limits enforces. */ @@ -124,8 +123,6 @@ public abstract class DataLimits public abstract int perPartitionCount(); - public abstract boolean countsCells(); - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) { return new CountingUnfilteredPartitionIterator(iter, newCounter(nowInSec, false)); @@ -269,11 +266,6 @@ public abstract class DataLimits return perPartitionLimit; } - public boolean countsCells() - { - return false; - } - public float estimateTotalResults(ColumnFamilyStore cfs) { // TODO: we should start storing stats on the number of rows (instead of the number of cells, which @@ -353,7 +345,7 @@ public abstract class DataLimits { sb.append("LIMIT ").append(rowLimit); if (perPartitionLimit != Integer.MAX_VALUE) - sb.append(" "); + sb.append(' '); } if (perPartitionLimit != Integer.MAX_VALUE) @@ -511,11 +503,6 @@ public abstract class DataLimits return cellPerPartitionLimit; } - public boolean countsCells() - { - return true; - } - public float estimateTotalResults(ColumnFamilyStore cfs) { // remember that getMeansColumns returns a number of cells: we should clean nomenclature @@ -572,7 +559,7 @@ public abstract class DataLimits public void newRow(Row row) { - for (Cell cell : row) + for (Cell cell : row.cells()) { if (assumeLiveData || cell.isLive(nowInSec)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 8f34efb..5a49bca 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -137,11 +137,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> if (metadata.isCompound()) { List<ByteBuffer> values = CompositeType.splitName(name); - return new SimpleClustering(values.toArray(new ByteBuffer[metadata.comparator.size()])); + return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()])); } else { - return new SimpleClustering(name); + return new Clustering(name); } } @@ -165,28 +165,18 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> super(expressions); } - public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, final int nowInSec) + public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec) { if (expressions.isEmpty()) return iter; - return new WrappingUnfilteredPartitionIterator(iter) + return new AlteringUnfilteredPartitionIterator(iter) { - @Override - public UnfilteredRowIterator computeNext(final UnfilteredRowIterator iter) + protected Row computeNext(DecoratedKey partitionKey, Row row) { - return new FilteringRowIterator(iter) - { - // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them. - // (we should however not filter them in the output of the method, hence it's not used as row filter for the - // FilteringRowIterator) - private final TombstoneFilteringRow filter = new TombstoneFilteringRow(nowInSec); - - protected boolean includeRow(Row row) - { - return CQLFilter.this.isSatisfiedBy(iter.partitionKey(), filter.setTo(row)); - } - }; + // We filter tombstones when passing the row to isSatisfiedBy so that the method doesn't have to bother with them. + Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + return purged != null && CQLFilter.this.isSatisfiedBy(partitionKey, purged) ? row : null; } }; } @@ -515,10 +505,9 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> CollectionType<?> type = (CollectionType<?>)column.type; if (column.isComplex()) { - Iterator<Cell> iter = row.getCells(column); - while (iter.hasNext()) + ComplexColumnData complexData = row.getComplexColumnData(column); + for (Cell cell : complexData) { - Cell cell = iter.next(); if (type.kind == CollectionType.Kind.SET) { if (type.nameComparator().compare(cell.path().get(0), value) == 0) @@ -720,7 +709,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> // In thrift, we actually allow expression on non-defined columns for the sake of filtering. To accomodate // this we create a "fake" definition. This is messy but it works so is probably good enough. - return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type, null); + return ColumnDefinition.regularDef(metadata, name, metadata.compactValueColumn().type); } public boolean isSatisfiedBy(DecoratedKey partitionKey, Row row) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index 794744a..c3a3c08 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -108,17 +108,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec public void deleteForCleanup(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec) { - delete(rowKey, clustering, cell.value(), cell.path(), new SimpleDeletionTime(cell.livenessInfo().timestamp(), nowInSec), opGroup); + delete(rowKey, clustering, cell.value(), cell.path(), new DeletionTime(cell.timestamp(), nowInSec), opGroup); } public void delete(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path, DeletionTime deletion, OpOrder.Group opGroup) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path)); - PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1); - Row.Writer writer = upd.writer(); - Rows.writeClustering(makeIndexClustering(rowKey, clustering, path), writer); - writer.writeRowDeletion(deletion); - writer.endOfRow(); + + Row row = ArrayBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null); if (logger.isDebugEnabled()) logger.debug("removed index entry for cleaned-up value {}:{}", valueKey, upd); @@ -126,18 +125,16 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup) { - insert(rowKey, clustering, cell, cell.livenessInfo(), opGroup); + insert(rowKey, clustering, cell, LivenessInfo.create(cell.timestamp(), cell.ttl(), cell.localDeletionTime()), opGroup); } public void insert(ByteBuffer rowKey, Clustering clustering, Cell cell, LivenessInfo info, OpOrder.Group opGroup) { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell)); - PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, valueKey, PartitionColumns.NONE, 1); - Row.Writer writer = upd.writer(); - Rows.writeClustering(makeIndexClustering(rowKey, clustering, cell), writer); - writer.writePartitionKeyLivenessInfo(info); - writer.endOfRow(); + Row row = ArrayBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); + if (logger.isDebugEnabled()) logger.debug("applying index row {} in {}", indexCfs.metadata.getKeyValidator().getString(valueKey.getKey()), upd); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java index ab8e688..897aa9c 100644 --- a/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/PerColumnSecondaryIndex.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.index; import java.nio.ByteBuffer; +import java.util.Iterator; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; @@ -87,17 +88,18 @@ public abstract class PerColumnSecondaryIndex extends SecondaryIndex long timestamp = row.primaryKeyLivenessInfo().timestamp(); int ttl = row.primaryKeyLivenessInfo().ttl(); - for (Cell cell : row) + for (Cell cell : row.cells()) { - if (cell.isLive(nowInSec) && cell.livenessInfo().timestamp() > timestamp) + if (cell.isLive(nowInSec) && cell.timestamp() > timestamp) { - timestamp = cell.livenessInfo().timestamp(); - ttl = cell.livenessInfo().ttl(); + timestamp = cell.timestamp(); + ttl = cell.ttl(); } } maybeIndex(key.getKey(), clustering, timestamp, ttl, opGroup, nowInSec); } - for (Cell cell : row) + + for (Cell cell : row.cells()) { if (!indexes(cell.column())) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 1bd5452..aaefc9c 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -18,15 +18,7 @@ package org.apache.cassandra.db.index; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.IdentityHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; @@ -469,7 +461,8 @@ public class SecondaryIndexManager if (!row.deletion().isLive()) for (PerColumnSecondaryIndex index : indexes) index.maybeDelete(key, clustering, row.deletion(), opGroup); - for (Cell cell : row) + + for (Cell cell : row.cells()) { for (PerColumnSecondaryIndex index : indexes) { @@ -636,8 +629,7 @@ public class SecondaryIndexManager // Completely identical cells (including expiring columns with // identical ttl & localExpirationTime) will not get this far due // to the oldCell.equals(newCell) in StandardUpdater.update - return !oldCell.value().equals(newCell.value()) - || oldCell.livenessInfo().timestamp() != newCell.livenessInfo().timestamp(); + return !oldCell.value().equals(newCell.value()) || oldCell.timestamp() != newCell.timestamp(); } private Set<String> filterByColumn(Set<String> idxNames) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java index 1d978a2..d4ca707 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexSearcher.java @@ -112,7 +112,7 @@ public abstract class SecondaryIndexSearcher NavigableSet<Clustering> requested = ((ClusteringIndexNamesFilter)filter).requestedRows(); BTreeSet.Builder<Clustering> clusterings = BTreeSet.builder(index.getIndexComparator()); for (Clustering c : requested) - clusterings.add(index.makeIndexClustering(pk, c, (Cell)null).takeAlias()); + clusterings.add(index.makeIndexClustering(pk, c, (Cell)null)); return new ClusteringIndexNamesFilter(clusterings.build(), filter.isReversed()); } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index 7a40a90..e073802 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -112,11 +112,8 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec) { - PartitionUpdate upd = new PartitionUpdate(indexCfs.metadata, entry.indexValue, PartitionColumns.NONE, 1); - Row.Writer writer = upd.writer(); - Rows.writeClustering(entry.indexClustering, writer); - writer.writeRowDeletion(new SimpleDeletionTime(entry.timestamp, nowInSec)); - writer.endOfRow(); + Row row = ArrayBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec)); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row); indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null); if (logger.isDebugEnabled()) @@ -159,10 +156,10 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public IndexedEntry(DecoratedKey indexValue, Clustering indexClustering, long timestamp, ByteBuffer indexedKey, Clustering indexedEntryClustering) { this.indexValue = indexValue; - this.indexClustering = indexClustering.takeAlias(); + this.indexClustering = indexClustering; this.timestamp = timestamp; this.indexedKey = indexedKey; - this.indexedEntryClustering = indexedEntryClustering.takeAlias(); + this.indexedEntryClustering = indexedEntryClustering; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java index aa58511..6529ad9 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java @@ -118,7 +118,7 @@ public class CompositesIndexOnClusteringKey extends CompositesIndex public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec) { if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null) - insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup); + insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java index 5af842c..30391cf 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java @@ -90,10 +90,9 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - Iterator<Cell> iter = data.getCells(columnDef); - while (iter.hasNext()) + ComplexColumnData complexData = data.getComplexColumnData(columnDef); + for (Cell cell : complexData) { - Cell cell = iter.next(); if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0) return false; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java index d48e58b..a93f8e1 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java @@ -93,7 +93,7 @@ public class CompositesIndexOnPartitionKey extends CompositesIndex @Override public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec) { - insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup); + insert(partitionKey, clustering, null, LivenessInfo.create(indexCfs.metadata, timestamp, ttl, nowInSec), opGroup); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 029dd3c..ce92164 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -171,49 +171,20 @@ public class CompositesSearcher extends SecondaryIndexSearcher final OpOrder.Group writeOp, final int nowInSec) { - return new WrappingUnfilteredRowIterator(dataIter) + return new AlteringUnfilteredRowIterator(dataIter) { private int entriesIdx; - private Unfiltered next; @Override - public boolean hasNext() - { - return prepareNext(); - } - - @Override - public Unfiltered next() + protected Row computeNext(Row row) { - if (next == null) - prepareNext(); + CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec); + if (!index.isStale(row, indexValue, nowInSec)) + return row; - Unfiltered toReturn = next; - next = null; - return toReturn; - } - - private boolean prepareNext() - { - if (next != null) - return true; - - while (super.hasNext()) - { - next = super.next(); - if (next.kind() != Unfiltered.Kind.ROW) - return true; - - Row row = (Row)next; - CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec); - if (!index.isStale(row, indexValue, nowInSec)) - return true; - - // The entry is stale: delete the entry and ignore otherwise - index.delete(entry, writeOp, nowInSec); - next = null; - } - return false; + // The entry is stale: delete the entry and ignore otherwise + index.delete(entry, writeOp, nowInSec); + return null; } private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 6b53640..118fb75 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -138,7 +138,7 @@ public class KeysSearcher extends SecondaryIndexSearcher // is the indexed name. Ans so we need to materialize the partition. ArrayBackedPartition result = ArrayBackedPartition.create(iterator); iterator.close(); - Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes)); + Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes)); Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn()); return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec) ? null @@ -173,10 +173,10 @@ public class KeysSearcher extends SecondaryIndexSearcher { // Index is stale, remove the index entry and ignore index.delete(partitionKey.getKey(), - new SimpleClustering(index.indexedColumn().name.bytes), + new Clustering(index.indexedColumn().name.bytes), indexedValue, null, - new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), + new DeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), writeOp); return true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/AbstractType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index 78ead36..258a8a5 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.marshal; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -36,6 +35,7 @@ import org.apache.cassandra.serializers.MarshalException; import org.github.jamm.Unmetered; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.utils.ByteBufferUtil; @@ -325,7 +325,7 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> if (valueLengthIfFixed() >= 0) out.write(value); else - ByteBufferUtil.writeWithLength(value, out); + ByteBufferUtil.writeWithVIntLength(value, out); } public long writtenLength(ByteBuffer value) @@ -333,25 +333,25 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> assert value.hasRemaining(); return valueLengthIfFixed() >= 0 ? value.remaining() - : TypeSizes.sizeofWithLength(value); + : TypeSizes.sizeofWithVIntLength(value); } - public ByteBuffer readValue(DataInput in) throws IOException + public ByteBuffer readValue(DataInputPlus in) throws IOException { int length = valueLengthIfFixed(); if (length >= 0) return ByteBufferUtil.read(in, length); else - return ByteBufferUtil.readWithLength(in); + return ByteBufferUtil.readWithVIntLength(in); } - public void skipValue(DataInput in) throws IOException + public void skipValue(DataInputPlus in) throws IOException { int length = valueLengthIfFixed(); - if (length < 0) - length = in.readInt(); - - FileUtils.skipBytesFully(in, length); + if (length >= 0) + FileUtils.skipBytesFully(in, length); + else + ByteBufferUtil.skipWithVIntLength(in); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/marshal/CollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java index a850305..9a096d0 100644 --- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java +++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; -import java.io.DataInput; import java.io.IOException; import java.util.List; import java.util.Iterator; @@ -34,6 +33,7 @@ import org.apache.cassandra.cql3.Sets; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.CollectionSerializer; @@ -236,23 +236,22 @@ public abstract class CollectionType<T> extends AbstractType<T> { public void serialize(CellPath path, DataOutputPlus out) throws IOException { - ByteBufferUtil.writeWithLength(path.get(0), out); + ByteBufferUtil.writeWithVIntLength(path.get(0), out); } - public CellPath deserialize(DataInput in) throws IOException + public CellPath deserialize(DataInputPlus in) throws IOException { - return CellPath.create(ByteBufferUtil.readWithLength(in)); + return CellPath.create(ByteBufferUtil.readWithVIntLength(in)); } public long serializedSize(CellPath path) { - return TypeSizes.sizeofWithLength(path.get(0)); + return ByteBufferUtil.serializedSizeWithVIntLength(path.get(0)); } - public void skip(DataInput in) throws IOException + public void skip(DataInputPlus in) throws IOException { - int length = in.readInt(); - FileUtils.skipBytesFully(in, length); + ByteBufferUtil.skipWithVIntLength(in); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java deleted file mode 100644 index 6775cf1..0000000 --- a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java +++ /dev/null @@ -1,850 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.utils.SearchIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Abstract common class for all non-thread safe Partition implementations. - */ -public abstract class AbstractPartitionData implements Partition, Iterable<Row> -{ - private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class); - - protected final CFMetaData metadata; - protected final DecoratedKey key; - - protected final DeletionInfo deletionInfo; - protected final PartitionColumns columns; - - protected Row staticRow; - - protected int rows; - - // The values for the clustering columns of the rows contained in this partition object. If - // clusteringSize is the size of the clustering comparator for this table, clusterings has size - // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering - // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)). - protected ByteBuffer[] clusterings; - - // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i). - protected final LivenessInfoArray livenessInfos; - // The row deletion for the rows of this partition (row i has its row deletion at index i). - protected final DeletionTimeArray deletions; - - // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition. - protected final RowDataBlock data; - - // Stats over the rows stored in this partition. - private final RowStats.Collector statsCollector = new RowStats.Collector(); - - // The maximum timestamp for any data contained in this partition. - protected long maxTimestamp = Long.MIN_VALUE; - - private AbstractPartitionData(CFMetaData metadata, - DecoratedKey key, - DeletionInfo deletionInfo, - ByteBuffer[] clusterings, - LivenessInfoArray livenessInfos, - DeletionTimeArray deletions, - PartitionColumns columns, - RowDataBlock data) - { - this.metadata = metadata; - this.key = key; - this.deletionInfo = deletionInfo; - this.clusterings = clusterings; - this.livenessInfos = livenessInfos; - this.deletions = deletions; - this.columns = columns; - this.data = data; - - collectStats(deletionInfo.getPartitionDeletion()); - Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false); - while (iter.hasNext()) - collectStats(iter.next().deletionTime()); - } - - protected AbstractPartitionData(CFMetaData metadata, - DecoratedKey key, - DeletionInfo deletionInfo, - PartitionColumns columns, - RowDataBlock data, - int initialRowCapacity) - { - this(metadata, - key, - deletionInfo, - new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()], - new LivenessInfoArray(initialRowCapacity), - new DeletionTimeArray(initialRowCapacity), - columns, - data); - } - - protected AbstractPartitionData(CFMetaData metadata, - DecoratedKey key, - DeletionTime partitionDeletion, - PartitionColumns columns, - int initialRowCapacity, - boolean sortable) - { - this(metadata, - key, - new DeletionInfo(partitionDeletion.takeAlias()), - columns, - new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()), - initialRowCapacity); - } - - private void collectStats(DeletionTime dt) - { - statsCollector.updateDeletionTime(dt); - maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt()); - } - - private void collectStats(LivenessInfo info) - { - statsCollector.updateTimestamp(info.timestamp()); - statsCollector.updateTTL(info.ttl()); - statsCollector.updateLocalDeletionTime(info.localDeletionTime()); - maxTimestamp = Math.max(maxTimestamp, info.timestamp()); - } - - public CFMetaData metadata() - { - return metadata; - } - - public DecoratedKey partitionKey() - { - return key; - } - - public DeletionTime partitionLevelDeletion() - { - return deletionInfo.getPartitionDeletion(); - } - - public PartitionColumns columns() - { - return columns; - } - - public Row staticRow() - { - return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; - } - - public RowStats stats() - { - return statsCollector.get(); - } - - /** - * The deletion info for the partition update. - * - * <b>warning:</b> the returned object should be used in a read-only fashion. In particular, - * it should not be used to add new range tombstones to this deletion. For that, - * {@link addRangeTombstone} should be used instead. The reason being that adding directly to - * the returned object would bypass some stats collection that {@code addRangeTombstone} does. - * - * @return the deletion info for the partition update for use as read-only. - */ - public DeletionInfo deletionInfo() - { - // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We - // could add the option of providing a read-only view of a DeletionInfo instead. - return deletionInfo; - } - - public void addPartitionDeletion(DeletionTime deletionTime) - { - collectStats(deletionTime); - deletionInfo.add(deletionTime); - } - - public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion) - { - addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias())); - } - - public void addRangeTombstone(RangeTombstone range) - { - collectStats(range.deletionTime()); - deletionInfo.add(range, metadata.comparator); - } - - /** - * Swap row i and j. - * - * This is only used when we need to reorder rows because those were not added in clustering order, - * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method - * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but - * it should really only be used by subclasses (and with care) in practice. - */ - public void swap(int i, int j) - { - int cs = metadata.clusteringColumns().size(); - for (int k = 0; k < cs; k++) - { - ByteBuffer tmp = clusterings[j * cs + k]; - clusterings[j * cs + k] = clusterings[i * cs + k]; - clusterings[i * cs + k] = tmp; - } - - livenessInfos.swap(i, j); - deletions.swap(i, j); - data.swap(i, j); - } - - protected void merge(int i, int j, int nowInSec) - { - data.merge(i, j, nowInSec); - if (livenessInfos.timestamp(i) > livenessInfos.timestamp(j)) - livenessInfos.move(i, j); - if (deletions.supersedes(i, j)) - deletions.move(i, j); - } - - protected void move(int i, int j) - { - int cs = metadata.clusteringColumns().size(); - for (int k = 0; k < cs; k++) - clusterings[j * cs + k] = clusterings[i * cs + k]; - data.move(i, j); - livenessInfos.move(i, j); - deletions.move(i, j); - } - - public int rowCount() - { - return rows; - } - - public boolean isEmpty() - { - return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty(); - } - - protected void clear() - { - rows = 0; - Arrays.fill(clusterings, null); - livenessInfos.clear(); - deletions.clear(); - data.clear(); - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(); - CFMetaData metadata = metadata(); - sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s", - metadata.ksName, - metadata.cfName, - metadata.getKeyValidator().getString(partitionKey().getKey()), - columns(), - deletionInfo)); - - if (staticRow() != Rows.EMPTY_STATIC_ROW) - sb.append("\n ").append(staticRow().toString(metadata, true)); - - // We use createRowIterator() directly instead of iterator() because that avoids - // sorting for PartitionUpdate (which inherit this method) and that is useful because - // 1) it can help with debugging and 2) we can't write after sorting but we want to - // be able to print an update while we build it (again for debugging) - Iterator<Row> iterator = createRowIterator(null, false); - while (iterator.hasNext()) - sb.append("\n ").append(iterator.next().toString(metadata, true)); - - return sb.toString(); - } - - protected void reverse() - { - for (int i = 0; i < rows / 2; i++) - swap(i, rows - 1 - i); - } - - public Row getRow(Clustering clustering) - { - Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); - // Note that for statics, this will never return null, this will return an empty row. However, - // it's more consistent for this method to return null if we don't really have a static row. - return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; - } - - /** - * Returns an iterator that iterators over the rows of this update in clustering order. - * - * @return an iterator over the rows of this update. - */ - public Iterator<Row> iterator() - { - return createRowIterator(null, false); - } - - public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed) - { - final RowIterator iter = createRowIterator(columns, reversed); - return new SearchIterator<Clustering, Row>() - { - public boolean hasNext() - { - return iter.hasNext(); - } - - public Row next(Clustering key) - { - if (key == Clustering.STATIC_CLUSTERING) - { - if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty()) - return Rows.EMPTY_STATIC_ROW; - - return FilteringRow.columnsFilteringRow(columns).setTo(staticRow()); - } - - return iter.seekTo(key) ? iter.next() : null; - } - }; - } - - public UnfilteredRowIterator unfilteredIterator() - { - return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); - } - - public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed) - { - return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed)); - } - - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() - { - return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false); - } - - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed) - { - return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed) - { - private final RowIterator rowIterator = createRowIterator(selection, reversed); - private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed); - - protected Unfiltered computeNext() - { - if (!mergeIterator.isSet()) - mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed)); - - return mergeIterator.hasNext() ? mergeIterator.next() : endOfData(); - } - - public Iterator<Unfiltered> slice(Slice slice) - { - return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed)); - } - }; - } - - private RowIterator createRowIterator(ColumnFilter columns, boolean reversed) - { - return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns); - } - - /** - * An iterator over the rows of this partition that reuse the same row object. - */ - private abstract class RowIterator extends UnmodifiableIterator<Row> - { - protected final InternalReusableClustering clustering = new InternalReusableClustering(); - protected final InternalReusableRow reusableRow; - protected final FilteringRow filter; - - protected int next; - - protected RowIterator(final ColumnFilter columns) - { - this.reusableRow = new InternalReusableRow(clustering); - this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns); - } - - /* - * Move the iterator so that row {@code name} is returned next by {@code next} if that - * row exists. Otherwise the first row sorting after {@code name} will be returned. - * Returns whether {@code name} was found or not. - */ - public abstract boolean seekTo(Clustering name); - - public abstract Iterator<Row> slice(Slice slice); - - protected Row setRowTo(int row) - { - reusableRow.setTo(row); - return filter == null ? reusableRow : filter.setTo(reusableRow); - } - - /** - * Simple binary search. - */ - protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex) - { - int low = fromIndex; - int mid = toIndex; - int high = mid - 1; - int result = -1; - while (low <= high) - { - mid = (low + high) >> 1; - if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0) - low = mid + 1; - else if (result == 0) - return mid; - else - high = mid - 1; - } - return -mid - (result < 0 ? 1 : 2); - } - } - - private class ForwardRowIterator extends RowIterator - { - private ForwardRowIterator(ColumnFilter columns) - { - super(columns); - this.next = 0; - } - - public boolean hasNext() - { - return next < rows; - } - - public Row next() - { - return setRowTo(next++); - } - - public boolean seekTo(Clustering name) - { - if (next >= rows) - return false; - - int idx = binarySearch(name, next, rows); - next = idx >= 0 ? idx : -idx - 1; - return idx >= 0; - } - - public Iterator<Row> slice(Slice slice) - { - int sidx = binarySearch(slice.start(), next, rows); - final int start = sidx >= 0 ? sidx : -sidx - 1; - if (start >= rows) - return Collections.emptyIterator(); - - int eidx = binarySearch(slice.end(), start, rows); - // The insertion point is the first element greater than slice.end(), so we want the previous index - final int end = eidx >= 0 ? eidx : -eidx - 2; - - // Remember the end to speed up potential further slice search - next = end; - - if (start > end) - return Collections.emptyIterator(); - - return new AbstractIterator<Row>() - { - private int i = start; - - protected Row computeNext() - { - if (i >= rows || i > end) - return endOfData(); - - return setRowTo(i++); - } - }; - } - } - - private class ReverseRowIterator extends RowIterator - { - private ReverseRowIterator(ColumnFilter columns) - { - super(columns); - this.next = rows - 1; - } - - public boolean hasNext() - { - return next >= 0; - } - - public Row next() - { - return setRowTo(next--); - } - - public boolean seekTo(Clustering name) - { - // We only use that method with forward iterators. - throw new UnsupportedOperationException(); - } - - public Iterator<Row> slice(Slice slice) - { - int sidx = binarySearch(slice.end(), 0, next + 1); - // The insertion point is the first element greater than slice.end(), so we want the previous index - final int start = sidx >= 0 ? sidx : -sidx - 2; - if (start < 0) - return Collections.emptyIterator(); - - int eidx = binarySearch(slice.start(), 0, start + 1); - final int end = eidx >= 0 ? eidx : -eidx - 1; - - // Remember the end to speed up potential further slice search - next = end; - - if (start < end) - return Collections.emptyIterator(); - - return new AbstractIterator<Row>() - { - private int i = start; - - protected Row computeNext() - { - if (i < 0 || i < end) - return endOfData(); - - return setRowTo(i--); - } - }; - } - } - - /** - * A reusable view over the clustering of this partition. - */ - protected class InternalReusableClustering extends Clustering - { - final int size = metadata.clusteringColumns().size(); - private int base; - - public int size() - { - return size; - } - - public Clustering setTo(int row) - { - base = row * size; - return this; - } - - public ByteBuffer get(int i) - { - return clusterings[base + i]; - } - - public ByteBuffer[] getRawValues() - { - ByteBuffer[] values = new ByteBuffer[size]; - for (int i = 0; i < size; i++) - values[i] = get(i); - return values; - } - }; - - /** - * A reusable view over the rows of this partition. - */ - protected class InternalReusableRow extends AbstractReusableRow - { - private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor(); - private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor(); - private final InternalReusableClustering clustering; - - private int row; - - public InternalReusableRow() - { - this(new InternalReusableClustering()); - } - - public InternalReusableRow(InternalReusableClustering clustering) - { - this.clustering = clustering; - } - - protected RowDataBlock data() - { - return data; - } - - public Row setTo(int row) - { - this.clustering.setTo(row); - this.liveness.setTo(livenessInfos, row); - this.deletion.setTo(deletions, row); - this.row = row; - return this; - } - - protected int row() - { - return row; - } - - public Clustering clustering() - { - return clustering; - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return liveness; - } - - public DeletionTime deletion() - { - return deletion; - } - }; - - private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator - { - private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder) - { - super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats()); - } - } - - /** - * A row writer to add rows to this partition. - */ - protected class Writer extends RowDataBlock.Writer - { - private int clusteringBase; - - private int simpleColumnsSetInRow; - private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>(); - - public Writer(boolean inOrderCells) - { - super(data, inOrderCells); - } - - public void writeClusteringValue(ByteBuffer value) - { - ensureCapacity(row); - clusterings[clusteringBase++] = value; - } - - public void writePartitionKeyLivenessInfo(LivenessInfo info) - { - ensureCapacity(row); - livenessInfos.set(row, info); - collectStats(info); - } - - public void writeRowDeletion(DeletionTime deletion) - { - ensureCapacity(row); - if (!deletion.isLive()) - deletions.set(row, deletion); - - collectStats(deletion); - } - - @Override - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - ensureCapacity(row); - collectStats(info); - - if (column.isComplex()) - complexColumnsSetInRow.add(column); - else - ++simpleColumnsSetInRow; - - super.writeCell(column, isCounter, value, info, path); - } - - @Override - public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion) - { - ensureCapacity(row); - collectStats(complexDeletion); - - super.writeComplexDeletion(c, complexDeletion); - } - - @Override - public void endOfRow() - { - super.endOfRow(); - ++rows; - - statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size()); - - simpleColumnsSetInRow = 0; - complexColumnsSetInRow.clear(); - } - - public int currentRow() - { - return row; - } - - private void ensureCapacity(int rowToSet) - { - int originalCapacity = livenessInfos.size(); - if (rowToSet < originalCapacity) - return; - - int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet); - - int clusteringSize = metadata.clusteringColumns().size(); - - clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize); - - livenessInfos.resize(newCapacity); - deletions.resize(newCapacity); - } - - @Override - public Writer reset() - { - super.reset(); - clusteringBase = 0; - simpleColumnsSetInRow = 0; - complexColumnsSetInRow.clear(); - return this; - } - } - - /** - * A range tombstone marker writer to add range tombstone markers to this partition. - */ - protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer - { - private final boolean reversed; - - private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()]; - private int size; - private RangeTombstone.Bound.Kind nextKind; - - private Slice.Bound openBound; - private DeletionTime openDeletion; - - public RangeTombstoneCollector(boolean reversed) - { - this.reversed = reversed; - } - - public void writeClusteringValue(ByteBuffer value) - { - nextValues[size++] = value; - } - - public void writeBoundKind(RangeTombstone.Bound.Kind kind) - { - nextKind = kind; - } - - private ByteBuffer[] getValues() - { - return Arrays.copyOfRange(nextValues, 0, size); - } - - private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion) - { - openBound = Slice.Bound.create(kind, getValues()); - openDeletion = deletion.takeAlias(); - } - - private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion) - { - assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was " + deletion; - Slice.Bound closeBound = Slice.Bound.create(kind, getValues()); - Slice slice = reversed - ? Slice.make(closeBound, openBound) - : Slice.make(openBound, closeBound); - addRangeTombstone(slice, openDeletion); - } - - public void writeBoundDeletion(DeletionTime deletion) - { - assert !nextKind.isBoundary(); - if (nextKind.isOpen(reversed)) - open(nextKind, deletion); - else - close(nextKind, deletion); - } - - public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion) - { - assert nextKind.isBoundary(); - DeletionTime closeTime = reversed ? startDeletion : endDeletion; - DeletionTime openTime = reversed ? endDeletion : startDeletion; - - close(nextKind.closeBoundOfBoundary(reversed), closeTime); - open(nextKind.openBoundOfBoundary(reversed), openTime); - } - - public void endOfMarker() - { - clear(); - } - - private void addRangeTombstone(Slice deletionSlice, DeletionTime dt) - { - AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt); - } - - private void clear() - { - size = 0; - Arrays.fill(nextValues, null); - nextKind = null; - } - - public void reset() - { - openBound = null; - openDeletion = null; - clear(); - } - } -}
