http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java new file mode 100644 index 0000000..4edd707 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -0,0 +1,819 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; +import org.apache.cassandra.utils.btree.UpdateFunction; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.Locks; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.apache.cassandra.utils.memory.HeapAllocator; +import org.apache.cassandra.service.StorageService; +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; + +/** + * A thread-safe and atomic Partition implementation. + * + * Operations (in particular addAll) on this implementation are atomic and + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no + * other thread can see the state where only parts but not all rows have + * been added. + */ +public class AtomicBTreePartition implements Partition +{ + private static final Logger logger = LoggerFactory.getLogger(AtomicBTreePartition.class); + + public static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"), + StorageService.getPartitioner().decorateKey(ByteBuffer.allocate(1)), + null)); + + // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) + private static final int TRACKER_NEVER_WASTED = 0; + private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE; + + // The granularity with which we track wasted allocation/work; we round up + private static final int ALLOCATION_GRANULARITY_BYTES = 1024; + // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below) + private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L; + private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES); + // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior + private static final int CLOCK_SHIFT = 17; + // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms + + /** + * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by + * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s + * + * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes + * we increment the current value if it is within this window, and set it to the min of the window plus our waste + * otherwise. + */ + private volatile int wasteTracker = TRACKER_NEVER_WASTED; + + private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker"); + + private static final DeletionInfo LIVE = DeletionInfo.live(); + // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class, + // so we can safely alias one DeletionInfo.live() reference and avoid some allocations. + private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS); + + private final CFMetaData metadata; + private final DecoratedKey partitionKey; + private final MemtableAllocator allocator; + + private volatile Holder ref; + + private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, Holder.class, "ref"); + + public AtomicBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, MemtableAllocator allocator) + { + this.metadata = metadata; + this.partitionKey = partitionKey; + this.allocator = allocator; + this.ref = EMPTY; + } + + public boolean isEmpty() + { + return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && ref.staticRow == null; + } + + public CFMetaData metadata() + { + return metadata; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + return ref.deletionInfo.getPartitionDeletion(); + } + + public PartitionColumns columns() + { + // We don't really know which columns will be part of the update, so assume it's all of them + return metadata.partitionColumns(); + } + + public boolean hasRows() + { + return !BTree.isEmpty(ref.tree); + } + + public RowStats stats() + { + return ref.stats; + } + + public Row getRow(Clustering clustering) + { + Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); + // Note that for statics, this will never return null, this will return an empty row. However, + // it's more consistent for this method to return null if we don't really have a static row. + return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; + } + + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed) + { + // TODO: we could optimize comparison for "NativeRow" Ã la #6755 + final Holder current = ref; + return new SearchIterator<Clustering, Row>() + { + private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed); + private final MemtableRowData.ReusableRow row = allocator.newReusableRow(); + private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns); + private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt(); + + public boolean hasNext() + { + return rawIter.hasNext(); + } + + public Row next(Clustering key) + { + if (key == Clustering.STATIC_CLUSTERING) + return makeStatic(columns, current, allocator); + + MemtableRowData data = rawIter.next(key); + // We also need to find if there is a range tombstone covering this key + RangeTombstone rt = current.deletionInfo.rangeCovering(key); + + if (data == null) + { + // If we have a range tombstone but not data, "fake" the RT by return a row deletion + // corresponding to the tombstone. + if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion) + return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime())); + return null; + } + + row.setTo(data); + + filter.setRowDeletion(null); + if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion) + { + filter.setDeletionTimestamp(partitionDeletion); + } + else + { + filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt()); + // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then + // we replace the row deletion by the tombstone deletion as a way to return the tombstone. + if (rt.deletionTime().supersedes(row.deletion())) + filter.setRowDeletion(rt.deletionTime()); + } + + return filter.setTo(row); + } + }; + } + + private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion) + { + return new AbstractRow() + { + public Columns columns() + { + return Columns.NONE; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return LivenessInfo.NONE; + } + + public DeletionTime deletion() + { + return deletion; + } + + public boolean isEmpty() + { + return true; + } + + public boolean hasComplexDeletion() + { + return false; + } + + public Clustering clustering() + { + return clustering; + } + + public Cell getCell(ColumnDefinition c) + { + return null; + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + return null; + } + + public Iterator<Cell> getCells(ColumnDefinition c) + { + return null; + } + + public DeletionTime getDeletion(ColumnDefinition c) + { + return DeletionTime.LIVE; + } + + public Iterator<Cell> iterator() + { + return Iterators.<Cell>emptyIterator(); + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return new SearchIterator<ColumnDefinition, ColumnData>() + { + public boolean hasNext() + { + return false; + } + + public ColumnData next(ColumnDefinition column) + { + return null; + } + }; + } + + public Row takeAlias() + { + return this; + } + }; + } + + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + { + if (slices.size() == 0) + { + Holder current = ref; + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + if (selection.fetchedColumns().statics.isEmpty() && partitionDeletion.isLive()) + return UnfilteredRowIterators.emptyIterator(metadata, partitionKey, reversed); + + return new AbstractUnfilteredRowIterator(metadata, + partitionKey, + partitionDeletion, + selection.fetchedColumns(), + makeStatic(selection, current, allocator), + reversed, + current.stats) + { + protected Unfiltered computeNext() + { + return endOfData(); + } + }; + } + + return slices.size() == 1 + ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator) + : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator); + } + + private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator) + { + Columns statics = selection.fetchedColumns().statics; + if (statics.isEmpty() || holder.staticRow == null) + return Rows.EMPTY_STATIC_ROW; + + return new ReusableFilteringRow(statics, selection) + .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt()) + .setTo(allocator.newReusableRow().setTo(holder.staticRow)); + } + + private static class ReusableFilteringRow extends FilteringRow + { + private final Columns columns; + private final ColumnFilter selection; + private ColumnFilter.Tester tester; + private long deletionTimestamp; + + // Used by searchIterator in case the row is covered by a tombstone. + private DeletionTime rowDeletion; + + public ReusableFilteringRow(Columns columns, ColumnFilter selection) + { + this.columns = columns; + this.selection = selection; + } + + public ReusableFilteringRow setDeletionTimestamp(long timestamp) + { + this.deletionTimestamp = timestamp; + return this; + } + + public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion) + { + this.rowDeletion = rowDeletion; + return this; + } + + @Override + public DeletionTime deletion() + { + return rowDeletion == null ? super.deletion() : rowDeletion; + } + + @Override + protected boolean include(LivenessInfo info) + { + return info.timestamp() > deletionTimestamp; + } + + @Override + protected boolean include(ColumnDefinition def) + { + return columns.contains(def); + } + + @Override + protected boolean include(DeletionTime dt) + { + return dt.markedForDeleteAt() > deletionTimestamp; + } + + @Override + protected boolean include(ColumnDefinition c, DeletionTime dt) + { + return dt.markedForDeleteAt() > deletionTimestamp; + } + + @Override + protected boolean include(Cell cell) + { + return selection.includes(cell); + } + } + + private static class SingleSliceIterator extends AbstractUnfilteredRowIterator + { + private final Iterator<Unfiltered> iterator; + private final ReusableFilteringRow row; + + private SingleSliceIterator(CFMetaData metadata, + DecoratedKey key, + Holder holder, + ColumnFilter selection, + Slice slice, + boolean isReversed, + MemtableAllocator allocator) + { + super(metadata, + key, + holder.deletionInfo.getPartitionDeletion(), + selection.fetchedColumns(), + makeStatic(selection, holder, allocator), + isReversed, + holder.stats); + + Iterator<Row> rowIter = rowIter(metadata, + holder, + slice, + !isReversed, + allocator); + + this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed) + .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed)); + + this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection) + .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt()); + } + + private Iterator<Row> rowIter(CFMetaData metadata, + Holder holder, + Slice slice, + boolean forwards, + final MemtableAllocator allocator) + { + Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); + Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); + final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards); + return new AbstractIterator<Row>() + { + private final MemtableRowData.ReusableRow row = allocator.newReusableRow(); + + protected Row computeNext() + { + return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData(); + } + }; + } + + protected Unfiltered computeNext() + { + while (iterator.hasNext()) + { + Unfiltered next = iterator.next(); + if (next.kind() == Unfiltered.Kind.ROW) + { + row.setTo((Row)next); + if (!row.isEmpty()) + return row; + } + else + { + RangeTombstoneMarker marker = (RangeTombstoneMarker)next; + + long deletion = partitionLevelDeletion().markedForDeleteAt(); + if (marker.isOpen(isReverseOrder())) + deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt()); + row.setDeletionTimestamp(deletion); + return marker; + } + } + return endOfData(); + } + } + + public static class SlicesIterator extends AbstractUnfilteredRowIterator + { + private final Holder holder; + private final MemtableAllocator allocator; + private final ColumnFilter selection; + private final Slices slices; + + private int idx; + private UnfilteredRowIterator currentSlice; + + private SlicesIterator(CFMetaData metadata, + DecoratedKey key, + Holder holder, + ColumnFilter selection, + Slices slices, + boolean isReversed, + MemtableAllocator allocator) + { + super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats); + this.holder = holder; + this.selection = selection; + this.allocator = allocator; + this.slices = slices; + } + + protected Unfiltered computeNext() + { + while (true) + { + if (currentSlice == null) + { + if (idx >= slices.size()) + return endOfData(); + + int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx; + currentSlice = new SingleSliceIterator(metadata, + partitionKey, + holder, + selection, + slices.get(sliceIdx), + isReverseOrder, + allocator); + idx++; + } + + if (currentSlice.hasNext()) + return currentSlice.next(); + + currentSlice = null; + } + } + } + + /** + * Adds a given update to this in-memtable partition. + * + * @return an array containing first the difference in size seen after merging the updates, and second the minimum + * time detla between updates. + */ + public long[] addAllWithSizeDelta(final PartitionUpdate update, OpOrder.Group writeOp, Updater indexer) + { + RowUpdater updater = new RowUpdater(this, allocator, writeOp, indexer); + DeletionInfo inputDeletionInfoCopy = null; + + boolean monitorOwned = false; + try + { + if (usePessimisticLocking()) + { + Locks.monitorEnterUnsafe(this); + monitorOwned = true; + } + while (true) + { + Holder current = ref; + updater.ref = current; + updater.reset(); + + DeletionInfo deletionInfo; + if (update.deletionInfo().mayModify(current.deletionInfo)) + { + if (inputDeletionInfoCopy == null) + inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance); + + deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); + updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); + } + else + { + deletionInfo = current.deletionInfo; + } + + Row newStatic = update.staticRow(); + MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW + ? current.staticRow + : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic)); + Object[] tree = BTree.<Clusterable, Row, MemtableRowData>update(current.tree, update.metadata().comparator, update, update.rowCount(), updater); + RowStats newStats = current.stats.mergeWith(update.stats()); + + if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo, staticRow, newStats))) + { + indexer.updateRowLevelIndexes(); + updater.finish(); + return new long[]{ updater.dataSize, updater.colUpdateTimeDelta }; + } + else if (!monitorOwned) + { + boolean shouldLock = usePessimisticLocking(); + if (!shouldLock) + { + shouldLock = updateWastedAllocationTracker(updater.heapSize); + } + if (shouldLock) + { + Locks.monitorEnterUnsafe(this); + monitorOwned = true; + } + } + } + } + finally + { + if (monitorOwned) + Locks.monitorExitUnsafe(this); + } + + } + + public boolean usePessimisticLocking() + { + return wasteTracker == TRACKER_PESSIMISTIC_LOCKING; + } + + /** + * Update the wasted allocation tracker state based on newly wasted allocation information + * + * @param wastedBytes the number of bytes wasted by this thread + * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached + */ + private boolean updateWastedAllocationTracker(long wastedBytes) + { + // Early check for huge allocation that exceeds the limit + if (wastedBytes < EXCESS_WASTE_BYTES) + { + // We round up to ensure work < granularity are still accounted for + int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES; + + int oldTrackerValue; + while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker)) + { + // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap) + int time = (int) (System.nanoTime() >>> CLOCK_SHIFT); + int delta = oldTrackerValue - time; + if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET) + delta = -EXCESS_WASTE_OFFSET; + delta += wastedAllocation; + if (delta >= 0) + break; + if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta))) + return false; + } + } + // We have definitely reached our waste limit so set the state if it isn't already + wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING); + // And tell the caller to proceed with pessimistic locking + return true; + } + + private static int avoidReservedValues(int wasteTracker) + { + if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING) + return wasteTracker + 1; + return wasteTracker; + } + + private static final class Holder + { + final DeletionInfo deletionInfo; + // the btree of rows + final Object[] tree; + final MemtableRowData staticRow; + final RowStats stats; + + Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats) + { + this.tree = tree; + this.deletionInfo = deletionInfo; + this.staticRow = staticRow; + this.stats = stats; + } + + Holder with(DeletionInfo info) + { + return new Holder(this.tree, info, this.staticRow, this.stats); + } + } + + // the function we provide to the btree utilities to perform any column replacements + private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData> + { + final AtomicBTreePartition updating; + final MemtableAllocator allocator; + final OpOrder.Group writeOp; + final Updater indexer; + final int nowInSec; + Holder ref; + long dataSize; + long heapSize; + long colUpdateTimeDelta = Long.MAX_VALUE; + final MemtableRowData.ReusableRow row; + final MemtableAllocator.DataReclaimer reclaimer; + final MemtableAllocator.RowAllocator rowAllocator; + List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree + + private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) + { + this.updating = updating; + this.allocator = allocator; + this.writeOp = writeOp; + this.indexer = indexer; + this.nowInSec = FBUtilities.nowInSeconds(); + this.row = allocator.newReusableRow(); + this.reclaimer = allocator.reclaimer(); + this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp); + } + + public MemtableRowData apply(Row insert) + { + rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic()); + insert.copyTo(rowAllocator); + MemtableRowData data = rowAllocator.allocatedRowData(); + + insertIntoIndexes(insert); + + this.dataSize += data.dataSize(); + this.heapSize += data.unsharedHeapSizeExcludingData(); + if (inserted == null) + inserted = new ArrayList<>(); + inserted.add(data); + return data; + } + + public MemtableRowData apply(MemtableRowData existing, Row update) + { + Columns mergedColumns = existing.columns().mergeTo(update.columns()); + rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic()); + + colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer)); + + MemtableRowData reconciled = rowAllocator.allocatedRowData(); + + dataSize += reconciled.dataSize() - existing.dataSize(); + heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData(); + if (inserted == null) + inserted = new ArrayList<>(); + inserted.add(reconciled); + discard(existing); + + return reconciled; + } + + private void insertIntoIndexes(Row toInsert) + { + if (indexer == SecondaryIndexManager.nullUpdater) + return; + + maybeIndexPrimaryKeyColumns(toInsert); + Clustering clustering = toInsert.clustering(); + for (Cell cell : toInsert) + indexer.insert(clustering, cell); + } + + private void maybeIndexPrimaryKeyColumns(Row row) + { + // We want to update a primary key index with the most up to date info contains in that inserted row (if only for + // backward compatibility). Note that if there is an index but not a partition key or clustering column one, we've + // wasting this work. We might be able to avoid that if row indexing was pushed in the index updater. + long timestamp = row.primaryKeyLivenessInfo().timestamp(); + int ttl = row.primaryKeyLivenessInfo().ttl(); + + for (Cell cell : row) + { + long cellTimestamp = cell.livenessInfo().timestamp(); + if (cell.isLive(nowInSec)) + { + if (cellTimestamp > timestamp) + { + timestamp = cellTimestamp; + ttl = cell.livenessInfo().ttl(); + } + } + } + + indexer.maybeIndex(row.clustering(), timestamp, ttl, row.deletion()); + } + + protected void reset() + { + this.dataSize = 0; + this.heapSize = 0; + if (inserted != null) + { + for (MemtableRowData row : inserted) + abort(row); + inserted.clear(); + } + reclaimer.cancel(); + } + + protected void abort(MemtableRowData abort) + { + reclaimer.reclaimImmediately(abort); + } + + protected void discard(MemtableRowData discard) + { + reclaimer.reclaim(discard); + } + + public boolean abortEarly() + { + return updating.ref != ref; + } + + public void allocated(long heapSize) + { + this.heapSize += heapSize; + } + + protected void finish() + { + allocator.onHeap().allocate(heapSize, writeOp); + reclaimer.commit(); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java new file mode 100644 index 0000000..dc482f3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.cache.IRowCacheEntry; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.io.ISerializer; + +/** + * A partition stored in the partition cache. + * + * Note that in practice, the only implementation of this is {@link ArrayBackedPartition}, + * we keep this interface mainly 1) to make it clear what we need from partition in the cache + * (that we don't otherwise) and 2) because {@code ArrayBackedPartition} is used for other + * purpose (than caching) and hence using {@code CachedPartition} when we talk about caching is + * clearer. + */ +public interface CachedPartition extends Partition, IRowCacheEntry +{ + public static final ISerializer<CachedPartition> cacheSerializer = new ArrayBackedCachedPartition.Serializer(); + + /** + * The number of {@code Row} objects in this cached partition. + * + * Please note that this is <b>not</b> the number of <em>live</em> rows since + * some of the row may only contains deleted (or expired) information. + * + * @return the number of row in the partition. + */ + public int rowCount(); + + /** + * The number of rows that were live at the time the partition was cached. + * + * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this. + * + * @return the number of rows in this partition that were live at the time the + * partition was cached (this can be different from the number of live rows now + * due to expiring cells). + */ + public int cachedLiveRows(); + + /** + * The number of rows in this cached partition that have at least one non-expiring + * non-deleted cell. + * + * Note that this is generally not a very meaningful number, but this is used by + * {@link DataLimits#hasEnoughLiveData} as an optimization. + * + * @return the number of row that have at least one non-expiring non-deleted cell. + */ + public int rowsWithNonExpiringCells(); + + /** + * The last row in this cached partition (in order words, the row with the + * biggest clustering that the partition contains). + * + * @return the last row of the partition, or {@code null} if the partition is empty. + */ + public Row lastRow(); + + /** + * The number of {@code cell} objects that are not tombstone in this cached partition. + * + * Please note that this is <b>not</b> the number of <em>live</em> cells since + * some of the cells might be expired. + * + * @return the number of non tombstone cells in the partition. + */ + public int nonTombstoneCellCount(); + + /** + * The number of cells in this cached partition that are neither tombstone nor expiring. + * + * Note that this is generally not a very meaningful number, but this is used by + * {@link DataLimits#hasEnoughLiveData} as an optimization. + * + * @return the number of cells that are neither tombstones nor expiring. + */ + public int nonExpiringLiveCells(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java new file mode 100644 index 0000000..16445e7 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CountingPartitionIterator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.DataLimits; + +public class CountingPartitionIterator extends WrappingPartitionIterator +{ + protected final DataLimits.Counter counter; + + public CountingPartitionIterator(PartitionIterator result, DataLimits.Counter counter) + { + super(result); + this.counter = counter; + } + + public CountingPartitionIterator(PartitionIterator result, DataLimits limits, int nowInSec) + { + this(result, limits.newCounter(nowInSec, true)); + } + + public DataLimits.Counter counter() + { + return counter; + } + + @Override + public boolean hasNext() + { + if (counter.isDone()) + return false; + + return super.hasNext(); + } + + @Override + @SuppressWarnings("resource") // Close through the closing of the returned 'CountingRowIterator' (and CountingRowIterator shouldn't throw) + public RowIterator next() + { + return new CountingRowIterator(super.next(), counter); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java new file mode 100644 index 0000000..4ad321e --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CountingRowIterator.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.DataLimits; + +public class CountingRowIterator extends WrappingRowIterator +{ + protected final DataLimits.Counter counter; + + public CountingRowIterator(RowIterator iter, DataLimits.Counter counter) + { + super(iter); + this.counter = counter; + + counter.newPartition(iter.partitionKey(), iter.staticRow()); + } + + @Override + public boolean hasNext() + { + if (counter.isDoneForPartition()) + return false; + + return super.hasNext(); + } + + @Override + public Row next() + { + Row row = super.next(); + counter.newRow(row); + return row; + } + + @Override + public void close() + { + super.close(); + counter.endOfPartition(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java new file mode 100644 index 0000000..52eedd4 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredPartitionIterator.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.DataLimits; + +public class CountingUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator +{ + protected final DataLimits.Counter counter; + + public CountingUnfilteredPartitionIterator(UnfilteredPartitionIterator result, DataLimits.Counter counter) + { + super(result); + this.counter = counter; + } + + public DataLimits.Counter counter() + { + return counter; + } + + @Override + public boolean hasNext() + { + if (counter.isDone()) + return false; + + return super.hasNext(); + } + + @Override + public UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + return new CountingUnfilteredRowIterator(iter, counter); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java new file mode 100644 index 0000000..acaef5d --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.DataLimits; + +public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator +{ + private final DataLimits.Counter counter; + + public CountingUnfilteredRowIterator(UnfilteredRowIterator iter, DataLimits.Counter counter) + { + super(iter); + this.counter = counter; + + counter.newPartition(iter.partitionKey(), iter.staticRow()); + } + + public DataLimits.Counter counter() + { + return counter; + } + + @Override + public boolean hasNext() + { + if (counter.isDoneForPartition()) + return false; + + return super.hasNext(); + } + + @Override + public Unfiltered next() + { + Unfiltered unfiltered = super.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + counter.newRow((Row) unfiltered); + return unfiltered; + } + + @Override + public void close() + { + super.close(); + counter.endOfPartition(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java new file mode 100644 index 0000000..813654d --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; + +public class FilteredPartition extends AbstractPartitionData implements Iterable<Row> +{ + private FilteredPartition(CFMetaData metadata, + DecoratedKey partitionKey, + PartitionColumns columns, + int initialRowCapacity, + boolean sortable) + { + super(metadata, partitionKey, DeletionTime.LIVE, columns, initialRowCapacity, sortable); + } + + /** + * Create a FilteredPartition holding all the rows of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + */ + public static FilteredPartition create(RowIterator iterator) + { + FilteredPartition partition = new FilteredPartition(iterator.metadata(), + iterator.partitionKey(), + iterator.columns(), + 4, + iterator.isReverseOrder()); + + partition.staticRow = iterator.staticRow().takeAlias(); + + Writer writer = partition.new Writer(true); + + while (iterator.hasNext()) + iterator.next().copyTo(writer); + + // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering + // order. So if we've just added them in reverse clustering order, reverse them. + if (iterator.isReverseOrder()) + partition.reverse(); + + return partition; + } + + public RowIterator rowIterator() + { + final Iterator<Row> iter = iterator(); + return new RowIterator() + { + public CFMetaData metadata() + { + return metadata; + } + + public boolean isReverseOrder() + { + return false; + } + + public PartitionColumns columns() + { + return columns; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public Row staticRow() + { + return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public Row next() + { + return iter.next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + } + }; + } + + @Override + public String toString() + { + try (RowIterator iterator = rowIterator()) + { + StringBuilder sb = new StringBuilder(); + CFMetaData metadata = iterator.metadata(); + PartitionColumns columns = iterator.columns(); + + sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b", + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), + columns, + iterator.isReverseOrder())); + + if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(iterator.staticRow().toString(metadata)); + + while (iterator.hasNext()) + sb.append("\n ").append(iterator.next().toString(metadata)); + + return sb.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java new file mode 100644 index 0000000..c40109b --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; + +/** + * Abstract class to make it easier to write iterators that filter some + * parts of another iterator (used for purging tombstones and removing dropped columns). + */ +public abstract class FilteringPartitionIterator extends WrappingUnfilteredPartitionIterator +{ + private UnfilteredRowIterator next; + + protected FilteringPartitionIterator(UnfilteredPartitionIterator iter) + { + super(iter); + } + + // The filter to use for filtering row contents. Is null by default to mean no particular filtering + // but can be overriden by subclasses. Please see FilteringAtomIterator for details on how this is used. + protected FilteringRow makeRowFilter() + { + return null; + } + + // Whether or not we should bother filtering the provided rows iterator. This + // exists mainly for preformance + protected boolean shouldFilter(UnfilteredRowIterator iterator) + { + return true; + } + + protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) + { + return true; + } + + protected boolean includePartitionDeletion(DeletionTime dt) + { + return true; + } + + // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called. + protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) + { + return marker; + } + + // Called when a particular partition is skipped due to being empty post filtering + protected void onEmpty(DecoratedKey key) + { + } + + public boolean hasNext() + { + while (next == null && super.hasNext()) + { + UnfilteredRowIterator iterator = super.next(); + if (shouldFilter(iterator)) + { + next = new FilteringIterator(iterator); + if (!isForThrift() && next.isEmpty()) + { + onEmpty(iterator.partitionKey()); + iterator.close(); + next = null; + } + } + else + { + next = iterator; + } + } + return next != null; + } + + public UnfilteredRowIterator next() + { + UnfilteredRowIterator toReturn = next; + next = null; + return toReturn; + } + + @Override + public void close() + { + try + { + super.close(); + } + finally + { + if (next != null) + next.close(); + } + } + + private class FilteringIterator extends FilteringRowIterator + { + private FilteringIterator(UnfilteredRowIterator iterator) + { + super(iterator); + } + + @Override + protected FilteringRow makeRowFilter() + { + return FilteringPartitionIterator.this.makeRowFilter(); + } + + @Override + protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) + { + return FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker); + } + + @Override + protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) + { + return FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed); + } + + @Override + protected boolean includePartitionDeletion(DeletionTime dt) + { + return FilteringPartitionIterator.this.includePartitionDeletion(dt); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/Partition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/Partition.java b/src/java/org/apache/cassandra/db/partitions/Partition.java new file mode 100644 index 0000000..71d0411 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/Partition.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Slices; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.utils.SearchIterator; + +/** + * In-memory representation of a Partition. + * + * Note that most of the storage engine works through iterators (UnfilteredPartitionIterator) to + * avoid "materializing" a full partition/query response in memory as much as possible, + * and so Partition objects should be use as sparingly as possible. There is a couple + * of cases where we do need to represent partition in-memory (memtables and row cache). + */ +public interface Partition +{ + public CFMetaData metadata(); + public DecoratedKey partitionKey(); + public DeletionTime partitionLevelDeletion(); + + public PartitionColumns columns(); + + public RowStats stats(); + + /** + * Whether the partition object has no informations at all, including any deletion informations. + */ + public boolean isEmpty(); + + /** + * Returns the row corresponding to the provided clustering, or null if there is not such row. + */ + public Row getRow(Clustering clustering); + + /** + * Returns an iterator that allows to search specific rows efficiently. + */ + public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed); + + /** + * Returns an UnfilteredRowIterator over all the rows/RT contained by this partition. + */ + public UnfilteredRowIterator unfilteredIterator(); + + /** + * Returns an UnfilteredRowIterator over the rows/RT contained by this partition + * selected by the provided slices. + */ + public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java new file mode 100644 index 0000000..36358fc --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.Iterator; + +import org.apache.cassandra.db.rows.*; + +/** + * An iterator over a number of (filtered) partition. + * + * PartitionIterator is to RowIterator what UnfilteredPartitionIterator is to UnfilteredRowIterator + * though unlike UnfilteredPartitionIterator, it is not guaranteed that the RowIterator + * returned are in partitioner order. + * + * The object returned by a call to next() is only guaranteed to be + * valid until the next call to hasNext() or next(). If a consumer wants to keep a + * reference on the returned objects for longer than the iteration, it must + * make a copy of it explicitely. + */ +public interface PartitionIterator extends Iterator<RowIterator>, AutoCloseable +{ + public void close(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java new file mode 100644 index 0000000..219aa5a --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.*; +import java.security.MessageDigest; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.util.FileUtils; + +public abstract class PartitionIterators +{ + private PartitionIterators() {} + + public static final PartitionIterator EMPTY = new PartitionIterator() + { + public boolean hasNext() + { + return false; + } + + public RowIterator next() + { + throw new NoSuchElementException(); + } + + public void remove() + { + } + + public void close() + { + } + }; + + @SuppressWarnings("resource") // The created resources are returned right away + public static RowIterator getOnlyElement(final PartitionIterator iter, SinglePartitionReadCommand<?> command) + { + // If the query has no results, we'll get an empty iterator, but we still + // want a RowIterator out of this method, so we return an empty one. + RowIterator toReturn = iter.hasNext() + ? iter.next() + : RowIterators.emptyIterator(command.metadata(), + command.partitionKey(), + command.clusteringIndexFilter().isReversed()); + + // Note that in general, we should wrap the result so that it's close method actually + // close the whole PartitionIterator. + return new WrappingRowIterator(toReturn) + { + public void close() + { + try + { + super.close(); + } + finally + { + // asserting this only now because it bothers UnfilteredPartitionIterators.Serializer (which might be used + // under the provided DataIter) if hasNext() is called before the previously returned iterator hasn't been fully consumed. + assert !iter.hasNext(); + + iter.close(); + } + } + }; + } + + @SuppressWarnings("resource") // The created resources are returned right away + public static PartitionIterator concat(final List<PartitionIterator> iterators) + { + if (iterators.size() == 1) + return iterators.get(0); + + return new PartitionIterator() + { + private int idx = 0; + + public boolean hasNext() + { + while (idx < iterators.size()) + { + if (iterators.get(idx).hasNext()) + return true; + + ++idx; + } + return false; + } + + public RowIterator next() + { + if (!hasNext()) + throw new NoSuchElementException(); + return iterators.get(idx).next(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + FileUtils.closeQuietly(iterators); + } + }; + } + + public static void digest(PartitionIterator iterator, MessageDigest digest) + { + while (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) + { + RowIterators.digest(partition, digest); + } + } + } + + public static PartitionIterator singletonIterator(RowIterator iterator) + { + return new SingletonPartitionIterator(iterator); + } + + public static void consume(PartitionIterator iterator) + { + while (iterator.hasNext()) + { + try (RowIterator partition = iterator.next()) + { + while (partition.hasNext()) + partition.next(); + } + } + } + + /** + * Wraps the provided iterator so it logs the returned rows for debugging purposes. + * <p> + * Note that this is only meant for debugging as this can log a very large amount of + * logging at INFO. + */ + @SuppressWarnings("resource") // The created resources are returned right away + public static PartitionIterator loggingIterator(PartitionIterator iterator, final String id) + { + return new WrappingPartitionIterator(iterator) + { + public RowIterator next() + { + return RowIterators.loggingIterator(super.next(), id); + } + }; + } + + private static class SingletonPartitionIterator extends AbstractIterator<RowIterator> implements PartitionIterator + { + private final RowIterator iterator; + private boolean returned; + + private SingletonPartitionIterator(RowIterator iterator) + { + this.iterator = iterator; + } + + protected RowIterator computeNext() + { + if (returned) + return endOfData(); + + returned = true; + return iterator; + } + + public void close() + { + iterator.close(); + } + } +}
