http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index 25d1887..0b73292 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -24,27 +24,20 @@ import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.BTreeSearchIterator; import org.apache.cassandra.utils.btree.UpdateFunction; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.concurrent.Locks; import org.apache.cassandra.utils.memory.MemtableAllocator; import org.apache.cassandra.utils.memory.HeapAllocator; import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; -import static org.apache.cassandra.utils.btree.BTree.Dir.desc; /** * A thread-safe and atomic Partition implementation. @@ -54,10 +47,8 @@ import static org.apache.cassandra.utils.btree.BTree.Dir.desc; * other thread can see the state where only parts but not all rows have * been added. */ -public class AtomicBTreePartition implements Partition +public class AtomicBTreePartition extends AbstractBTreePartition { - private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class); - public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"), DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)), null)); @@ -75,6 +66,11 @@ public class AtomicBTreePartition implements Partition private static final int CLOCK_SHIFT = 17; // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms + private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + + private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker"); + private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref"); + /** * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s @@ -85,214 +81,25 @@ public class AtomicBTreePartition implements Partition */ private volatile int wasteTracker = TRACKER_NEVER_WASTED; - private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker"); - - private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); - - private final CFMetaData metadata; - private final DecoratedKey partitionKey; private final MemtableAllocator allocator; - private volatile Holder ref; - private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref"); - public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator) { - this.metadata = metadata; - this.partitionKey = partitionKey; + // TODO: is this a potential bug? partition columns may be a subset if we alter columns while it's in memtable + super(metadata, partitionKey, metadata.partitionColumns()); this.allocator = allocator; this.ref = EMPTY; } - public boolean isEmpty() - { - return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && ref.staticRow == null; - } - - public CFMetaData metadata() - { - return metadata; - } - - public DecoratedKey partitionKey() - { - return partitionKey; - } - - public DeletionTime partitionLevelDeletion() - { - return ref.deletionInfo.getPartitionDeletion(); - } - - public PartitionColumns columns() - { - // We don't really know which columns will be part of the update, so assume it's all of them - return metadata.partitionColumns(); - } - - public boolean hasRows() - { - return !BTree.isEmpty(ref.tree); - } - - public EncodingStats stats() - { - return ref.stats; - } - - public Row getRow(Clustering clustering) - { - Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); - // Note that for statics, this will never return null, this will return an empty row. However, - // it's more consistent for this method to return null if we don't really have a static row. - return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; - } - - private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow) - { - DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); - if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive())) - return Rows.EMPTY_STATIC_ROW; - - Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata); - return row == null ? Rows.EMPTY_STATIC_ROW : row; - } - - public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed) - { - // TODO: we could optimize comparison for "NativeRow" Ã la #6755 - final Holder current = ref; - return new SearchIterator<Clustering, Row>() - { - private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed)); - private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); - - public boolean hasNext() - { - return rawIter.hasNext(); - } - - public Row next(Clustering clustering) - { - if (clustering == Clustering.STATIC_CLUSTERING) - return staticRow(current, columns, true); - - Row row = rawIter.next(clustering); - RangeTombstone rt = current.deletionInfo.rangeCovering(clustering); - - // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row - // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion - // to carry the proper deletion on the row. - DeletionTime activeDeletion = partitionDeletion; - if (rt != null && rt.deletionTime().supersedes(activeDeletion)) - activeDeletion = rt.deletionTime(); - - if (row == null) - return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion); - - return row.filter(columns, activeDeletion, true, metadata); - } - }; - } - - public UnfilteredRowIterator unfilteredIterator() + protected Holder holder() { - return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + return ref; } - public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + protected boolean canHaveShadowedData() { - if (slices.size() == 0) - { - Holder current = ref; - DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); - if (selection.fetchedColumns().statics.isEmpty() && partitionDeletion.isLive()) - return UnfilteredRowIterators.emptyIterator(metadata, partitionKey, reversed); - - return new AbstractUnfilteredRowIterator(metadata, - partitionKey, - partitionDeletion, - selection.fetchedColumns(), - staticRow(current, selection, false), - reversed, - current.stats) - { - protected Unfiltered computeNext() - { - return endOfData(); - } - }; - } - - Holder current = ref; - Row staticRow = staticRow(current, selection, false); - return slices.size() == 1 - ? sliceIterator(selection, slices.get(0), reversed, current, staticRow) - : new SlicesIterator(metadata, partitionKey, selection, slices, reversed, current, staticRow); - } - - private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow) - { - Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); - Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); - Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed)); - - return new RowAndDeletionMergeIterator(metadata, - partitionKey, - current.deletionInfo.getPartitionDeletion(), - selection, - staticRow, - reversed, - current.stats, - rowIter, - current.deletionInfo.rangeIterator(slice, reversed), - true); - } - - public class SlicesIterator extends AbstractUnfilteredRowIterator - { - private final Holder current; - private final ColumnFilter selection; - private final Slices slices; - - private int idx; - private Iterator<Unfiltered> currentSlice; - - private SlicesIterator(CFMetaData metadata, - DecoratedKey key, - ColumnFilter selection, - Slices slices, - boolean isReversed, - Holder holder, - Row staticRow) - { - super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), staticRow, isReversed, holder.stats); - this.current = holder; - this.selection = selection; - this.slices = slices; - } - - protected Unfiltered computeNext() - { - while (true) - { - if (currentSlice == null) - { - if (idx >= slices.size()) - return endOfData(); - - int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx; - currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW); - idx++; - } - - if (currentSlice.hasNext()) - return currentSlice.next(); - - currentSlice = null; - } - } + return true; } /** @@ -417,28 +224,6 @@ public class AtomicBTreePartition implements Partition return wasteTracker; } - private static final class Holder - { - final DeletionInfo deletionInfo; - // the btree of rows - final Object[] tree; - final Row staticRow; - final EncodingStats stats; - - Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats) - { - this.tree = tree; - this.deletionInfo = deletionInfo; - this.staticRow = staticRow; - this.stats = stats; - } - - Holder with(DeletionInfo info) - { - return new Holder(this.tree, info, this.staticRow, this.stats); - } - } - // the function we provide to the btree utilities to perform any column replacements private static final class RowUpdater implements UpdateFunction<Row, Row> { @@ -455,7 +240,6 @@ public class AtomicBTreePartition implements Partition final MemtableAllocator.DataReclaimer reclaimer; List<Row> inserted; // TODO: replace with walk of aborted BTree - private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) { this.updating = updating;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java new file mode 100644 index 0000000..b122791 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.io.IOException; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.btree.BTree; + +public class CachedBTreePartition extends ImmutableBTreePartition implements CachedPartition +{ + private final int createdAtInSec; + + private final int cachedLiveRows; + private final int rowsWithNonExpiringCells; + + private final int nonTombstoneCellCount; + private final int nonExpiringLiveCells; + + private CachedBTreePartition(CFMetaData metadata, + DecoratedKey partitionKey, + PartitionColumns columns, + Holder holder, + int createdAtInSec, + int cachedLiveRows, + int rowsWithNonExpiringCells, + int nonTombstoneCellCount, + int nonExpiringLiveCells) + { + super(metadata, partitionKey, columns, holder); + this.createdAtInSec = createdAtInSec; + this.cachedLiveRows = cachedLiveRows; + this.rowsWithNonExpiringCells = rowsWithNonExpiringCells; + this.nonTombstoneCellCount = nonTombstoneCellCount; + this.nonExpiringLiveCells = nonExpiringLiveCells; + } + + /** + * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator got gather in memory. + * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. + * @return the created partition. + */ + public static CachedBTreePartition create(UnfilteredRowIterator iterator, int nowInSec) + { + return create(iterator, 16, nowInSec); + } + + /** + * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator got gather in memory. + * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally + * correspond or be a good estimation of the number or rows in {@code iterator}. + * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. + * @return the created partition. + */ + public static CachedBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec) + { + Holder holder = ImmutableBTreePartition.build(iterator, initialRowCapacity); + + int cachedLiveRows = 0; + int rowsWithNonExpiringCells = 0; + int nonTombstoneCellCount = 0; + int nonExpiringLiveCells = 0; + + for (Row row : BTree.<Row>iterable(holder.tree)) + { + if (row.hasLiveData(nowInSec)) + ++cachedLiveRows; + + int nonExpiringLiveCellsThisRow = 0; + for (Cell cell : row.cells()) + { + if (!cell.isTombstone()) + { + ++nonTombstoneCellCount; + if (!cell.isExpiring()) + ++nonExpiringLiveCellsThisRow; + } + } + + if (nonExpiringLiveCellsThisRow > 0) + { + ++rowsWithNonExpiringCells; + nonExpiringLiveCells += nonExpiringLiveCellsThisRow; + } + } + + return new CachedBTreePartition(iterator.metadata(), + iterator.partitionKey(), + iterator.columns(), + holder, + nowInSec, + cachedLiveRows, + rowsWithNonExpiringCells, + nonTombstoneCellCount, + nonExpiringLiveCells); + } + + /** + * The number of rows that were live at the time the partition was cached. + * + * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this. + * + * @return the number of rows in this partition that were live at the time the + * partition was cached (this can be different from the number of live rows now + * due to expiring cells). + */ + public int cachedLiveRows() + { + return cachedLiveRows; + } + + /** + * The number of rows in this cached partition that have at least one non-expiring + * non-deleted cell. + * + * Note that this is generally not a very meaningful number, but this is used by + * {@link DataLimits#hasEnoughLiveData} as an optimization. + * + * @return the number of row that have at least one non-expiring non-deleted cell. + */ + public int rowsWithNonExpiringCells() + { + return rowsWithNonExpiringCells; + } + + public int nonTombstoneCellCount() + { + return nonTombstoneCellCount; + } + + public int nonExpiringLiveCells() + { + return nonExpiringLiveCells; + } + + static class Serializer implements ISerializer<CachedPartition> + { + public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException + { + int version = MessagingService.current_version; + + assert partition instanceof CachedBTreePartition; + CachedBTreePartition p = (CachedBTreePartition)partition; + + out.writeInt(p.createdAtInSec); + out.writeInt(p.cachedLiveRows); + out.writeInt(p.rowsWithNonExpiringCells); + out.writeInt(p.nonTombstoneCellCount); + out.writeInt(p.nonExpiringLiveCells); + CFMetaData.serializer.serialize(partition.metadata(), out, version); + try (UnfilteredRowIterator iter = p.unfilteredIterator()) + { + UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount()); + } + } + + public CachedPartition deserialize(DataInputPlus in) throws IOException + { + int version = MessagingService.current_version; + + // Note that it would be slightly simpler to just do + // ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...)); + // However deserializing the header separatly is not a lot harder and allows us to: + // 1) get the capacity of the partition so we can size it properly directly + // 2) saves the creation of a temporary iterator: rows are directly written to the partition, which + // is slightly faster. + + int createdAtInSec = in.readInt(); + int cachedLiveRows = in.readInt(); + int rowsWithNonExpiringCells = in.readInt(); + int nonTombstoneCellCount = in.readInt(); + int nonExpiringLiveCells = in.readInt(); + + + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL); + assert !header.isReversed && header.rowEstimate >= 0; + + Holder holder; + try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, SerializationHelper.Flag.LOCAL, header)) + { + holder = ImmutableBTreePartition.build(partition, header.rowEstimate); + } + + return new CachedBTreePartition(metadata, + header.key, + header.sHeader.columns(), + holder, + createdAtInSec, + cachedLiveRows, + rowsWithNonExpiringCells, + nonTombstoneCellCount, + nonExpiringLiveCells); + + } + + public long serializedSize(CachedPartition partition) + { + int version = MessagingService.current_version; + + assert partition instanceof CachedBTreePartition; + CachedBTreePartition p = (CachedBTreePartition)partition; + + try (UnfilteredRowIterator iter = p.unfilteredIterator()) + { + return TypeSizes.sizeof(p.createdAtInSec) + + TypeSizes.sizeof(p.cachedLiveRows) + + TypeSizes.sizeof(p.rowsWithNonExpiringCells) + + TypeSizes.sizeof(p.nonTombstoneCellCount) + + TypeSizes.sizeof(p.nonExpiringLiveCells) + + CFMetaData.serializer.serializedSize(partition.metadata(), version) + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount()); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/CachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java index dc482f3..33e6ecc 100644 --- a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java @@ -24,15 +24,13 @@ import org.apache.cassandra.io.ISerializer; /** * A partition stored in the partition cache. * - * Note that in practice, the only implementation of this is {@link ArrayBackedPartition}, - * we keep this interface mainly 1) to make it clear what we need from partition in the cache - * (that we don't otherwise) and 2) because {@code ArrayBackedPartition} is used for other - * purpose (than caching) and hence using {@code CachedPartition} when we talk about caching is - * clearer. + * Note that in practice, the only implementation of this is {@link CachedBTreePartition}, + * we keep this interface mainly to make it clear what we need from partition in the cache + * (that we don't otherwise) */ public interface CachedPartition extends Partition, IRowCacheEntry { - public static final ISerializer<CachedPartition> cacheSerializer = new ArrayBackedCachedPartition.Serializer(); + public static final ISerializer<CachedPartition> cacheSerializer = new CachedBTreePartition.Serializer(); /** * The number of {@code Row} objects in this cached partition. http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java index 3a57d1a..e0b568d 100644 --- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java @@ -17,24 +17,19 @@ */ package org.apache.cassandra.db.partitions; -import java.util.*; +import java.util.Iterator; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.rows.*; -public class FilteredPartition extends AbstractThreadUnsafePartition +public class FilteredPartition extends ImmutableBTreePartition { - private final Row staticRow; - - private FilteredPartition(CFMetaData metadata, - DecoratedKey partitionKey, - PartitionColumns columns, - Row staticRow, - List<Row> rows) + public FilteredPartition(RowIterator rows) { - super(metadata, partitionKey, columns, rows); - this.staticRow = staticRow; + super(rows.metadata(), rows.partitionKey(), rows.columns(), build(rows, DeletionInfo.LIVE, false, 16)); } /** @@ -45,43 +40,7 @@ public class FilteredPartition extends AbstractThreadUnsafePartition */ public static FilteredPartition create(RowIterator iterator) { - CFMetaData metadata = iterator.metadata(); - boolean reversed = iterator.isReverseOrder(); - - List<Row> rows = new ArrayList<>(); - - while (iterator.hasNext()) - { - Unfiltered unfiltered = iterator.next(); - if (unfiltered.isRow()) - rows.add((Row)unfiltered); - } - - if (reversed) - Collections.reverse(rows); - - return new FilteredPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows); - } - - protected boolean canHaveShadowedData() - { - // We only create instances from RowIterator that don't have shadowed data (nor deletion info really) - return false; - } - - public Row staticRow() - { - return staticRow; - } - - public DeletionInfo deletionInfo() - { - return DeletionInfo.LIVE; - } - - public EncodingStats stats() - { - return EncodingStats.NO_STATS; + return new FilteredPartition(iterator); } public RowIterator rowIterator() @@ -106,7 +65,7 @@ public class FilteredPartition extends AbstractThreadUnsafePartition public DecoratedKey partitionKey() { - return key; + return partitionKey; } public Row staticRow() @@ -114,6 +73,8 @@ public class FilteredPartition extends AbstractThreadUnsafePartition return FilteredPartition.this.staticRow(); } + public void close() {} + public boolean hasNext() { return iter.hasNext(); @@ -124,34 +85,10 @@ public class FilteredPartition extends AbstractThreadUnsafePartition return iter.next(); } - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() + public boolean isEmpty() { + return staticRow().isEmpty() && !hasRows(); } }; } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(); - - sb.append(String.format("[%s.%s] key=%s columns=%s", - metadata.ksName, - metadata.cfName, - metadata.getKeyValidator().getString(partitionKey().getKey()), - columns)); - - if (staticRow() != Rows.EMPTY_STATIC_ROW) - sb.append("\n ").append(staticRow().toString(metadata)); - - for (Row row : this) - sb.append("\n ").append(row.toString(metadata)); - - return sb.toString(); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java new file mode 100644 index 0000000..a13e070 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java @@ -0,0 +1,93 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.rows.*; + +public class ImmutableBTreePartition extends AbstractBTreePartition +{ + + protected final Holder holder; + + public ImmutableBTreePartition(CFMetaData metadata, + DecoratedKey partitionKey, + PartitionColumns columns, + Row staticRow, + Object[] tree, + DeletionInfo deletionInfo, + EncodingStats stats) + { + super(metadata, partitionKey, columns); + this.holder = new Holder(tree, deletionInfo, staticRow, stats); + } + + protected ImmutableBTreePartition(CFMetaData metadata, + DecoratedKey partitionKey, + PartitionColumns columns, + Holder holder) + { + super(metadata, partitionKey, columns); + this.holder = holder; + } + + /** + * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. + * @return the created partition. + */ + public static ImmutableBTreePartition create(UnfilteredRowIterator iterator) + { + return create(iterator, 16); + } + + /** + * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. + * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally + * correspond or be a good estimation of the number or rows in {@code iterator}. + * @return the created partition. + */ + public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity) + { + return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), iterator.columns(), + build(iterator, initialRowCapacity)); + } + + protected Holder holder() + { + return holder; + } + + protected boolean canHaveShadowedData() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index e6d51e5..2f0a4ec 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -43,6 +43,8 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.UpdateFunction; /** * Stores updates made on a partition. @@ -58,7 +60,7 @@ import org.apache.cassandra.utils.Pair; * is also a few static helper constructor methods for special cases ({@code emptyUpdate()}, * {@code fullPartitionDelete} and {@code singleRowUpdate}). */ -public class PartitionUpdate extends AbstractThreadUnsafePartition +public class PartitionUpdate extends AbstractBTreePartition { protected static final Logger logger = LoggerFactory.getLogger(PartitionUpdate.class); @@ -73,28 +75,37 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition private boolean isBuilt; private boolean canReOpen = true; - private final MutableDeletionInfo deletionInfo; - private EncodingStats stats; // will be null if isn't built - - private Row staticRow = Rows.EMPTY_STATIC_ROW; + private Holder holder; + private BTree.Builder<Row> rowBuilder; + private MutableDeletionInfo deletionInfo; private final boolean canHaveShadowedData; private PartitionUpdate(CFMetaData metadata, DecoratedKey key, PartitionColumns columns, - Row staticRow, - List<Row> rows, MutableDeletionInfo deletionInfo, - EncodingStats stats, - boolean isBuilt, + int initialRowCapacity, + boolean canHaveShadowedData) + { + super(metadata, key, columns); + this.deletionInfo = deletionInfo; + this.holder = new Holder(BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + this.canHaveShadowedData = canHaveShadowedData; + rowBuilder = builder(initialRowCapacity); + } + + private PartitionUpdate(CFMetaData metadata, + DecoratedKey key, + PartitionColumns columns, + Holder holder, + MutableDeletionInfo deletionInfo, boolean canHaveShadowedData) { - super(metadata, key, columns, rows); - this.staticRow = staticRow; + super(metadata, key, columns); + this.holder = holder; this.deletionInfo = deletionInfo; - this.stats = stats; - this.isBuilt = isBuilt; + this.isBuilt = true; this.canHaveShadowedData = canHaveShadowedData; } @@ -103,7 +114,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition PartitionColumns columns, int initialRowCapacity) { - this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true); + this(metadata, key, columns, MutableDeletionInfo.live(), initialRowCapacity, true); } public PartitionUpdate(CFMetaData metadata, @@ -127,7 +138,9 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key) { - return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false); + MutableDeletionInfo deletionInfo = MutableDeletionInfo.live(); + Holder holder = new Holder(BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + return new PartitionUpdate(metadata, key, PartitionColumns.NONE, holder, deletionInfo, false); } /** @@ -142,7 +155,9 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec) { - return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new MutableDeletionInfo(timestamp, nowInSec), EncodingStats.NO_STATS, true, false); + MutableDeletionInfo deletionInfo = new MutableDeletionInfo(timestamp, nowInSec); + Holder holder = new Holder(BTree.empty(), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + return new PartitionUpdate(metadata, key, PartitionColumns.NONE, holder, deletionInfo, false); } /** @@ -156,9 +171,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row) { - return row.isStatic() - ? new PartitionUpdate(metadata, key, new PartitionColumns(row.columns(), Columns.NONE), row, Collections.<Row>emptyList(), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false) - : new PartitionUpdate(metadata, key, new PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, Collections.singletonList(row), MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false); + MutableDeletionInfo deletionInfo = MutableDeletionInfo.live(); + if (row.isStatic()) + { + Holder holder = new Holder(BTree.empty(), deletionInfo, row, EncodingStats.NO_STATS); + return new PartitionUpdate(metadata, key, new PartitionColumns(row.columns(), Columns.NONE), holder, deletionInfo, false); + } + else + { + Holder holder = new Holder(BTree.singleton(row), deletionInfo, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + return new PartitionUpdate(metadata, key, new PartitionColumns(Columns.NONE, row.columns()), holder, deletionInfo, false); + } } /** @@ -183,47 +206,16 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator) { - CFMetaData metadata = iterator.metadata(); - boolean reversed = iterator.isReverseOrder(); - - List<Row> rows = new ArrayList<>(); - MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); - - while (iterator.hasNext()) - { - Unfiltered unfiltered = iterator.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - rows.add((Row)unfiltered); - else - deletionBuilder.add((RangeTombstoneMarker)unfiltered); - } - - if (reversed) - Collections.reverse(rows); - - return new PartitionUpdate(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), iterator.stats(), true, false); + Holder holder = build(iterator, 16); + MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo; + return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), holder, deletionInfo, false); } public static PartitionUpdate fromIterator(RowIterator iterator) { - CFMetaData metadata = iterator.metadata(); - boolean reversed = iterator.isReverseOrder(); - - List<Row> rows = new ArrayList<>(); - - EncodingStats.Collector collector = new EncodingStats.Collector(); - - while (iterator.hasNext()) - { - Row row = iterator.next(); - rows.add(row); - Rows.collectStats(row, collector); - } - - if (reversed) - Collections.reverse(rows); - - return new PartitionUpdate(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, MutableDeletionInfo.live(), collector.get(), true, false); + MutableDeletionInfo deletionInfo = MutableDeletionInfo.live(); + Holder holder = build(iterator, deletionInfo, true, 16); + return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), holder, deletionInfo, false); } protected boolean canHaveShadowedData() @@ -231,16 +223,6 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition return canHaveShadowedData; } - public Row staticRow() - { - return staticRow; - } - - public DeletionInfo deletionInfo() - { - return deletionInfo; - } - /** * Deserialize a partition update from a provided byte buffer. * @@ -312,7 +294,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition * * @return a partition update that include (merge) all the updates from {@code updates}. */ - public static PartitionUpdate merge(Collection<PartitionUpdate> updates) + public static PartitionUpdate merge(List<PartitionUpdate> updates) { assert !updates.isEmpty(); final int size = updates.size(); @@ -320,72 +302,9 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition if (size == 1) return Iterables.getOnlyElement(updates); - // Used when merging row to decide of liveness - int nowInSec = FBUtilities.nowInSeconds(); - - PartitionColumns.Builder builder = PartitionColumns.builder(); - DecoratedKey key = null; - CFMetaData metadata = null; - MutableDeletionInfo deletion = MutableDeletionInfo.live(); - Row staticRow = Rows.EMPTY_STATIC_ROW; - List<Iterator<Row>> updateRowIterators = new ArrayList<>(size); - EncodingStats stats = EncodingStats.NO_STATS; - - for (PartitionUpdate update : updates) - { - builder.addAll(update.columns()); - deletion.add(update.deletionInfo()); - if (!update.staticRow().isEmpty()) - staticRow = staticRow == Rows.EMPTY_STATIC_ROW ? update.staticRow() : Rows.merge(staticRow, update.staticRow(), nowInSec); - updateRowIterators.add(update.iterator()); - stats = stats.mergeWith(update.stats()); - - if (key == null) - key = update.partitionKey(); - else - assert key.equals(update.partitionKey()); - - if (metadata == null) - metadata = update.metadata(); - else - assert metadata.cfId.equals(update.metadata().cfId); - } - - PartitionColumns columns = builder.build(); - - final Row.Merger merger = new Row.Merger(size, nowInSec, columns.regulars); - - Iterator<Row> merged = MergeIterator.get(updateRowIterators, metadata.comparator, new MergeIterator.Reducer<Row, Row>() - { - @Override - public boolean trivialReduceIsTrivial() - { - return true; - } - - public void reduce(int idx, Row current) - { - merger.add(idx, current); - } - - protected Row getReduced() - { - // Note that while merger.getRow() can theoretically return null, it won't in this case because - // we don't pass an "activeDeletion". - return merger.merge(DeletionTime.LIVE); - } - - @Override - protected void onKeyChange() - { - merger.clear(); - } - }); - - List<Row> rows = new ArrayList<>(); - Iterators.addAll(rows, merged); - - return new PartitionUpdate(metadata, key, columns, staticRow, rows, deletion, stats, true, true); + int nowInSecs = FBUtilities.nowInSeconds(); + List<UnfilteredRowIterator> asIterators = Lists.transform(updates, AbstractBTreePartition::unfilteredIterator); + return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs)); } /** @@ -404,17 +323,12 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public void updateAllTimestamp(long newTimestamp) { - // We know we won't be updating that update again after this call, and doing is post built is potentially - // slightly more efficient (things are more "compact"). So force a build if it hasn't happened yet. - maybeBuild(); - + Holder holder = holder(); deletionInfo.updateAllTimestamp(newTimestamp - 1); - - if (!staticRow.isEmpty()) - staticRow = staticRow.updateAllTimestamp(newTimestamp); - - for (int i = 0; i < rows.size(); i++) - rows.set(i, rows.get(i).updateAllTimestamp(newTimestamp)); + Object[] tree = BTree.<Row>transformAndFilter(holder.tree, (x) -> x.updateAllTimestamp(newTimestamp)); + Row staticRow = holder.staticRow.updateAllTimestamp(newTimestamp); + EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.<Row>iterator(tree), deletionInfo); + this.holder = new Holder(tree, deletionInfo, staticRow, newStats); } /** @@ -427,7 +341,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition */ public int operationCount() { - return rows.size() + return rowCount() + deletionInfo.rangeCount() + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1); } @@ -449,17 +363,15 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition return size; } - @Override - public int rowCount() + protected Holder holder() { maybeBuild(); - return super.rowCount(); + return holder; } public EncodingStats stats() { - maybeBuild(); - return stats; + return holder().stats; } /** @@ -480,6 +392,15 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition // called concurrently with sort() (which should be avoided in the first place, but // better safe than sorry). isBuilt = false; + if (rowBuilder == null) + rowBuilder = builder(16); + } + + private BTree.Builder<Row> builder(int initialCapacity) + { + return BTree.<Row>builder(metadata.comparator, initialCapacity) + .setQuickResolver((a, b) -> + Rows.merge(a, b, createdAtInSec)); } /** @@ -565,7 +486,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition canReOpen = false; List<CounterMark> l = new ArrayList<>(); - for (Row row : rows) + for (Row row : this) { for (Cell cell : row.cells()) { @@ -616,28 +537,19 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition { // We test for == first because in most case it'll be true and that is faster assert columns().statics == row.columns() || columns().statics.contains(row.columns()); - staticRow = staticRow.isEmpty() + Row staticRow = holder.staticRow.isEmpty() ? row - : Rows.merge(staticRow, row, createdAtInSec); + : Rows.merge(holder.staticRow, row, createdAtInSec); + holder = new Holder(holder.tree, holder.deletionInfo, staticRow, holder.stats); } else { // We test for == first because in most case it'll be true and that is faster assert columns().regulars == row.columns() || columns().regulars.contains(row.columns()); - rows.add(row); + rowBuilder.add(row); } } - /** - * The number of rows contained in this update. - * - * @return the number of rows contained in this update. - */ - public int size() - { - return rows.size(); - } - private void maybeBuild() { if (isBuilt) @@ -651,53 +563,17 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition if (isBuilt) return; - if (rows.size() <= 1) - { - finishBuild(); - return; - } + Holder holder = this.holder; + Object[] cur = holder.tree; + Object[] add = rowBuilder.build(); + Object[] merged = BTree.<Row>merge(cur, add, metadata.comparator, + UpdateFunction.Simple.of((a, b) -> Rows.merge(a, b, createdAtInSec))); - Comparator<Row> comparator = metadata.comparator.rowComparator(); - // Sort the rows. Because the same row can have been added multiple times, we can still have duplicates after that - Collections.sort(rows, comparator); + assert deletionInfo == holder.deletionInfo; + EncodingStats newStats = EncodingStats.Collector.collect(holder.staticRow, BTree.<Row>iterator(merged), deletionInfo); - // Now find the duplicates and merge them together - int previous = 0; // The last element that was set - for (int current = 1; current < rows.size(); current++) - { - // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already - Row previousRow = rows.get(previous); - Row currentRow = rows.get(current); - int cmp = comparator.compare(previousRow, currentRow); - if (cmp == 0) - { - // current and previous are the same row. Merge current into previous - // (and so previous + 1 will be "free"). - rows.set(previous, Rows.merge(previousRow, currentRow, createdAtInSec)); - } - else - { - // current != previous, so move current just after previous if needs be - ++previous; - if (previous != current) - rows.set(previous, currentRow); - } - } - - // previous is on the last value to keep - for (int j = rows.size() - 1; j > previous; j--) - rows.remove(j); - - finishBuild(); - } - - private void finishBuild() - { - EncodingStats.Collector collector = new EncodingStats.Collector(); - deletionInfo.collectStats(collector); - for (Row row : rows) - Rows.collectStats(row, collector); - stats = collector.get(); + this.holder = new Holder(merged, holder.deletionInfo, holder.staticRow, newStats); + rowBuilder = null; isBuilt = true; } @@ -716,7 +592,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition else { CFMetaData.serializer.serialize(update.metadata(), out, version); - UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rows.size()); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount()); } } } @@ -760,7 +636,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition assert header.rowEstimate >= 0; MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false); - List<Row> rows = new ArrayList<>(header.rowEstimate); + BTree.Builder<Row> rows = BTree.builder(metadata.comparator, header.rowEstimate); + rows.auto(false); try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, flag, header)) { @@ -774,14 +651,12 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition } } + MutableDeletionInfo deletionInfo = deletionBuilder.build(); return new PartitionUpdate(metadata, header.key, header.sHeader.columns(), - header.staticRow, - rows, - deletionBuilder.build(), - header.sHeader.stats(), - true, + new Holder(rows.build(), deletionInfo, header.staticRow, header.sHeader.stats()), + deletionInfo, false); } @@ -802,7 +677,7 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition return LegacyLayout.serializedSizeAsLegacyPartition(iter, version); return CFMetaData.serializer.serializedSize(update.metadata(), version) - + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rows.size()); + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount()); } } } @@ -851,8 +726,8 @@ public class PartitionUpdate extends AbstractThreadUnsafePartition { // This is a bit of a giant hack as this is the only place where we mutate a Row object. This makes it more efficient // for counters however and this won't be needed post-#6506 so that's probably fine. - assert row instanceof BTreeBackedRow; - ((BTreeBackedRow)row).setValue(column, path, value); + assert row instanceof BTreeRow; + ((BTreeRow)row).setValue(column, path, value); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java deleted file mode 100644 index 548fb82..0000000 --- a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java +++ /dev/null @@ -1,602 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.nio.ByteBuffer; -import java.util.*; -import java.util.function.Predicate; - -import com.google.common.base.Function; -import com.google.common.collect.Iterators; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.utils.AbstractIterator; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.ObjectSizes; -import org.apache.cassandra.utils.btree.BTree; -import org.apache.cassandra.utils.btree.BTreeSearchIterator; -import org.apache.cassandra.utils.btree.UpdateFunction; - -/** - * Immutable implementation of a Row object. - */ -public class BTreeBackedRow extends AbstractRow -{ - private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY)); - - private final Clustering clustering; - private final Columns columns; - private final LivenessInfo primaryKeyLivenessInfo; - private final DeletionTime deletion; - - // The data for each columns present in this row in column sorted order. - private final Object[] btree; - - // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove - // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so - // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones, - // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given - // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to - // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and - // no expiring cells, this will be Integer.MAX_VALUE; - private final int minLocalDeletionTime; - - private BTreeBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime) - { - this.clustering = clustering; - this.columns = columns; - this.primaryKeyLivenessInfo = primaryKeyLivenessInfo; - this.deletion = deletion; - this.btree = btree; - this.minLocalDeletionTime = minLocalDeletionTime; - } - - // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases. - public static BTreeBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree) - { - int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); - if (minDeletionTime != Integer.MIN_VALUE) - { - for (ColumnData cd : BTree.<ColumnData>iterable(btree)) - minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); - } - - return new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); - } - - public static BTreeBackedRow emptyRow(Clustering clustering) - { - return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE); - } - - public static BTreeBackedRow singleCellRow(Clustering clustering, Cell cell) - { - if (cell.column().isSimple()) - return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell)); - - ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); - return new BTreeBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell)); - } - - public static BTreeBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion) - { - assert !deletion.isLive(); - return new BTreeBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE); - } - - public static BTreeBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo) - { - assert !primaryKeyLivenessInfo.isEmpty(); - return new BTreeBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo)); - } - - private static int minDeletionTime(Cell cell) - { - return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime(); - } - - private static int minDeletionTime(LivenessInfo info) - { - return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE; - } - - private static int minDeletionTime(DeletionTime dt) - { - return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE; - } - - private static int minDeletionTime(ComplexColumnData cd) - { - int min = minDeletionTime(cd.complexDeletion()); - for (Cell cell : cd) - { - min = Math.min(min, minDeletionTime(cell)); - if (min == Integer.MIN_VALUE) - break; - } - return min; - } - - private static int minDeletionTime(ColumnData cd) - { - return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd); - } - - private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion) - { - int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion)); - for (ColumnData cd : BTree.<ColumnData>iterable(btree)) - { - min = Math.min(min, minDeletionTime(cd)); - if (min == Integer.MIN_VALUE) - break; - } - return min; - } - - public Clustering clustering() - { - return clustering; - } - - public Columns columns() - { - return columns; - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return primaryKeyLivenessInfo; - } - - public boolean isEmpty() - { - return primaryKeyLivenessInfo().isEmpty() - && deletion().isLive() - && BTree.isEmpty(btree); - } - - public DeletionTime deletion() - { - return deletion; - } - - public Cell getCell(ColumnDefinition c) - { - assert !c.isComplex(); - return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); - } - - public Cell getCell(ColumnDefinition c, CellPath path) - { - assert c.isComplex(); - ComplexColumnData cd = getComplexColumnData(c); - if (cd == null) - return null; - return cd.getCell(path); - } - - public ComplexColumnData getComplexColumnData(ColumnDefinition c) - { - assert c.isComplex(); - return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); - } - - public Iterator<ColumnData> iterator() - { - return searchIterator(); - } - - public Iterable<Cell> cells() - { - return CellIterator::new; - } - - public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator() - { - return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC); - } - - public Row filter(ColumnFilter filter, CFMetaData metadata) - { - return filter(filter, DeletionTime.LIVE, false, metadata); - } - - public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata) - { - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); - - if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty()) - return this; - - boolean mayHaveShadowed = activeDeletion.supersedes(deletion); - - LivenessInfo newInfo = primaryKeyLivenessInfo; - DeletionTime newDeletion = deletion; - if (mayHaveShadowed) - { - if (activeDeletion.deletes(newInfo.timestamp())) - newInfo = LivenessInfo.EMPTY; - // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow, - // the row deletion is shadowed and we shouldn't return it. - newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE; - } - - Columns columns = filter.fetchedColumns().columns(isStatic()); - Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester(); - return transformAndFilter(newInfo, newDeletion, (cd) -> { - - ColumnDefinition column = cd.column(); - if (!inclusionTester.test(column)) - return null; - - CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); - if (column.isComplex()) - return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped); - - Cell cell = (Cell)cd; - return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)) - ? cell : null; - }); - } - - public boolean hasComplexDeletion() - { - // We start by the end cause we know complex columns sort before simple ones - for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC)) - { - if (cd.column().isSimple()) - return false; - - if (!((ComplexColumnData)cd).complexDeletion().isLive()) - return true; - } - return false; - } - - public Row markCounterLocalToBeCleared() - { - return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter() - ? cd.markCounterLocalToBeCleared() - : cd); - } - - public boolean hasDeletion(int nowInSec) - { - return nowInSec >= minLocalDeletionTime; - } - - /** - * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and - * all deletion timestamp by {@code newTimestamp - 1}. - * - * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details. - */ - public Row updateAllTimestamp(long newTimestamp) - { - LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp); - DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime()); - - return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp)); - } - - public Row purge(DeletionPurger purger, int nowInSec) - { - if (!hasDeletion(nowInSec)) - return this; - - LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo; - DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion; - - return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec)); - } - - private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function) - { - Object[] transformed = BTree.transformAndFilter(btree, function); - - if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion) - return this; - - if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed)) - return null; - - int minDeletionTime = minDeletionTime(transformed, info, deletion); - return new BTreeBackedRow(clustering, columns, info, deletion, transformed, minDeletionTime); - } - - public int dataSize() - { - int dataSize = clustering.dataSize() - + primaryKeyLivenessInfo.dataSize() - + deletion.dataSize(); - - for (ColumnData cd : this) - dataSize += cd.dataSize(); - return dataSize; - } - - public long unsharedHeapSizeExcludingData() - { - long heapSize = EMPTY_SIZE - + clustering.unsharedHeapSizeExcludingData() - + BTree.sizeOfStructureOnHeap(btree); - - for (ColumnData cd : this) - heapSize += cd.unsharedHeapSizeExcludingData(); - return heapSize; - } - - public static Row.Builder sortedBuilder(Columns columns) - { - return new Builder(columns, true); - } - - public static Row.Builder unsortedBuilder(Columns columns, int nowInSec) - { - return new Builder(columns, false, nowInSec); - } - - // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general - // assumption that Row objects are immutable. This method should go away post-#6506 in particular. - // This method is in particular not exposed by the Row API on purpose. - // This method also *assumes* that the cell we're setting already exists. - public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value) - { - ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column); - if (column.isSimple()) - BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value)); - else - ((ComplexColumnData) current).setValue(path, value); - } - - public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata) - { - return () -> new CellInLegacyOrderIterator(metadata); - } - - private class CellIterator extends AbstractIterator<Cell> - { - private Iterator<ColumnData> columnData = iterator(); - private Iterator<Cell> complexCells; - - protected Cell computeNext() - { - while (true) - { - if (complexCells != null) - { - if (complexCells.hasNext()) - return complexCells.next(); - - complexCells = null; - } - - if (!columnData.hasNext()) - return endOfData(); - - ColumnData cd = columnData.next(); - if (cd.column().isComplex()) - complexCells = ((ComplexColumnData)cd).iterator(); - else - return (Cell)cd; - } - } - } - - private class CellInLegacyOrderIterator extends AbstractIterator<Cell> - { - private final AbstractType<?> comparator; - private final int firstComplexIdx; - private int simpleIdx; - private int complexIdx; - private Iterator<Cell> complexCells; - private final Object[] data; - - private CellInLegacyOrderIterator(CFMetaData metadata) - { - this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR); - - // copy btree into array for simple separate iteration of simple and complex columns - this.data = new Object[BTree.size(btree)]; - BTree.toArray(btree, data, 0); - - int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData); - this.firstComplexIdx = idx < 0 ? data.length : idx; - this.complexIdx = firstComplexIdx; - } - - protected Cell computeNext() - { - while (true) - { - if (complexCells != null) - { - if (complexCells.hasNext()) - return complexCells.next(); - - complexCells = null; - } - - if (simpleIdx >= firstComplexIdx) - { - if (complexIdx >= data.length) - return endOfData(); - - complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); - } - else - { - if (complexIdx >= data.length) - return (Cell)data[simpleIdx++]; - - if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0) - return (Cell)data[simpleIdx++]; - else - complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); - } - } - } - } - - public static class Builder implements Row.Builder - { - // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time - private static class ComplexColumnDeletion extends BufferCell - { - public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime) - { - super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM); - } - } - - // converts a run of Cell with equal column into a ColumnData - private static class CellResolver implements BTree.Builder.Resolver - { - final int nowInSec; - private CellResolver(int nowInSec) - { - this.nowInSec = nowInSec; - } - - public ColumnData resolve(Object[] cells, int lb, int ub) - { - Cell cell = (Cell) cells[lb]; - ColumnDefinition column = cell.column; - if (cell.column.isSimple()) - { - assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE; - while (++lb < ub) - cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec); - return cell; - } - - // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are - // bedded in, as less important; galloping makes it pretty cheap anyway) - Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator()); - cell = (Cell) cells[lb]; - DeletionTime deletion = DeletionTime.LIVE; - if (cell instanceof ComplexColumnDeletion) - { - // TODO: do we need to be robust to multiple of these being provided? - deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime()); - lb++; - } - - List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub); - Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp()); - return new ComplexColumnData(column, btree, deletion); - } - - }; - protected final Columns columns; - - protected Clustering clustering; - protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY; - protected DeletionTime deletion = DeletionTime.LIVE; - - private final boolean isSorted; - private final BTree.Builder<Cell> cells; - private final CellResolver resolver; - private boolean hasComplex = false; - - // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion. - - protected Builder(Columns columns, boolean isSorted) - { - this(columns, isSorted, Integer.MIN_VALUE); - } - - protected Builder(Columns columns, boolean isSorted, int nowInSecs) - { - this.columns = columns; - this.cells = BTree.builder(ColumnData.comparator); - resolver = new CellResolver(nowInSecs); - this.isSorted = isSorted; - this.cells.auto(false); - } - - public boolean isSorted() - { - return isSorted; - } - - public void newRow(Clustering clustering) - { - assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before - this.clustering = clustering; - } - - public Clustering clustering() - { - return clustering; - } - - protected void reset() - { - this.clustering = null; - this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; - this.deletion = DeletionTime.LIVE; - this.cells.reuse(); - } - - public void addPrimaryKeyLivenessInfo(LivenessInfo info) - { - this.primaryKeyLivenessInfo = info; - } - - public void addRowDeletion(DeletionTime deletion) - { - this.deletion = deletion; - } - - public void addCell(Cell cell) - { - assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; - cells.add(cell); - hasComplex |= cell.column.isComplex(); - } - - public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) - { - cells.add(new ComplexColumnDeletion(column, complexDeletion)); - hasComplex = true; - } - - public Row build() - { - if (!isSorted) - cells.sort(); - // we can avoid resolving if we're sorted and have no complex values - // (because we'll only have unique simple cells, which are already in their final condition) - if (!isSorted | hasComplex) - cells.resolve(resolver); - Object[] btree = cells.build(); - int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion); - Row row = new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); - reset(); - return row; - } - - } -}
