Remove ArrayBackedPartition and hierarchy Introduces AbstractBTreePartition to share code between ImmutableBTreePartition and AtomicBTreePartition, eliminating much code duplication between the two hierarchies.
patch by benedict; reviewed by ariel for CASSANDRA-9932 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e51f83b6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e51f83b6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e51f83b6 Branch: refs/heads/trunk Commit: e51f83b60edf1f9ee12ef6a3083d3acbf85805f7 Parents: a186ac6 Author: Benedict Elliott Smith <[email protected]> Authored: Wed Jul 29 15:14:01 2015 +0100 Committer: Benedict Elliott Smith <[email protected]> Committed: Wed Aug 12 23:48:30 2015 +0200 ---------------------------------------------------------------------- .../apache/cassandra/cql3/UpdateParameters.java | 4 +- src/java/org/apache/cassandra/db/Columns.java | 4 +- .../cassandra/db/HintedHandOffManager.java | 4 +- .../org/apache/cassandra/db/LegacyLayout.java | 7 +- .../org/apache/cassandra/db/ReadResponse.java | 12 +- .../apache/cassandra/db/RowUpdateBuilder.java | 7 +- .../db/SinglePartitionNamesCommand.java | 8 +- .../db/SinglePartitionReadCommand.java | 2 +- .../cassandra/db/UnfilteredDeserializer.java | 2 +- .../columniterator/SSTableReversedIterator.java | 44 +- .../cassandra/db/compaction/Scrubber.java | 2 +- .../apache/cassandra/db/filter/RowFilter.java | 2 +- .../AbstractSimplePerColumnSecondaryIndex.java | 4 +- .../db/index/composites/CompositesIndex.java | 2 +- .../cassandra/db/index/keys/KeysSearcher.java | 2 +- .../db/partitions/AbstractBTreePartition.java | 403 +++++++++++++ .../AbstractThreadUnsafePartition.java | 399 ------------ .../partitions/ArrayBackedCachedPartition.java | 294 --------- .../db/partitions/ArrayBackedPartition.java | 114 ---- .../db/partitions/AtomicBTreePartition.java | 240 +------- .../db/partitions/CachedBTreePartition.java | 249 ++++++++ .../db/partitions/CachedPartition.java | 10 +- .../db/partitions/FilteredPartition.java | 89 +-- .../db/partitions/ImmutableBTreePartition.java | 93 +++ .../db/partitions/PartitionUpdate.java | 311 +++------- .../cassandra/db/rows/BTreeBackedRow.java | 602 ------------------- .../org/apache/cassandra/db/rows/BTreeRow.java | 602 +++++++++++++++++++ .../apache/cassandra/db/rows/EncodingStats.java | 11 + src/java/org/apache/cassandra/db/rows/Row.java | 2 +- .../apache/cassandra/db/rows/RowIterators.java | 22 +- src/java/org/apache/cassandra/db/rows/Rows.java | 4 +- .../rows/UnfilteredRowIteratorSerializer.java | 2 +- .../db/rows/UnfilteredRowIterators.java | 16 +- .../cassandra/db/rows/UnfilteredSerializer.java | 2 +- .../cassandra/db/view/MaterializedView.java | 18 +- .../apache/cassandra/db/view/TemporalRow.java | 4 +- .../io/sstable/SSTableSimpleIterator.java | 2 +- .../apache/cassandra/service/CacheService.java | 6 +- .../apache/cassandra/service/DataResolver.java | 2 +- .../cassandra/thrift/CassandraServer.java | 8 +- .../cassandra/thrift/ThriftResultsMerger.java | 4 +- .../cassandra/triggers/TriggerExecutor.java | 4 +- .../org/apache/cassandra/utils/btree/BTree.java | 89 ++- .../cassandra/utils/btree/UpdateFunction.java | 27 +- .../utils/memory/AbstractAllocator.java | 10 +- .../utils/memory/MemtableBufferAllocator.java | 3 +- .../apache/cassandra/utils/LongBTreeTest.java | 151 +++-- test/unit/org/apache/cassandra/Util.java | 10 +- .../cassandra/cache/CacheProviderTest.java | 18 +- .../org/apache/cassandra/cql3/CQLTester.java | 2 +- .../cassandra/db/BatchlogManagerTest.java | 21 +- .../cassandra/db/DeletePartitionTest.java | 2 +- .../org/apache/cassandra/db/PartitionTest.java | 10 +- .../apache/cassandra/db/RangeTombstoneTest.java | 4 +- .../apache/cassandra/db/RowIndexEntryTest.java | 2 +- test/unit/org/apache/cassandra/db/RowTest.java | 4 +- .../db/compaction/CompactionsPurgeTest.java | 10 +- .../cassandra/db/marshal/CompositeTypeTest.java | 4 +- .../db/marshal/DynamicCompositeTypeTest.java | 6 +- .../rows/RowAndDeletionMergeIteratorTest.java | 2 +- .../rows/UnfilteredRowIteratorsMergeTest.java | 2 +- .../io/sstable/SSTableRewriterTest.java | 4 +- .../streaming/StreamingTransferTest.java | 4 +- .../cassandra/triggers/TriggerExecutorTest.java | 2 +- .../org/apache/cassandra/utils/BTreeTest.java | 118 +++- 65 files changed, 1943 insertions(+), 2181 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 519eb4b..1c1dd43 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -120,13 +120,13 @@ public class UpdateParameters if (clustering == Clustering.STATIC_CLUSTERING) { if (staticBuilder == null) - staticBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.statics, nowInSec); + staticBuilder = BTreeRow.unsortedBuilder(updatedColumns.statics, nowInSec); builder = staticBuilder; } else { if (regularBuilder == null) - regularBuilder = BTreeBackedRow.unsortedBuilder(updatedColumns.regulars, nowInSec); + regularBuilder = BTreeRow.unsortedBuilder(updatedColumns.regulars, nowInSec); builder = regularBuilder; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index c584b4c..0b29830 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -38,6 +38,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSearchIterator; +import org.apache.cassandra.utils.btree.UpdateFunction; /** * An immutable and sorted list of (non-PK) columns for a given table. @@ -243,7 +244,8 @@ public class Columns implements Iterable<ColumnDefinition> if (this == NONE) return other; - Object[] tree = BTree.<ColumnDefinition>merge(this.columns, other.columns, Comparator.naturalOrder()); + Object[] tree = BTree.<ColumnDefinition>merge(this.columns, other.columns, Comparator.naturalOrder(), + UpdateFunction.noOp()); if (tree == this.columns) return this; if (tree == other.columns) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 4656d41..8bea2e8 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -150,7 +150,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version)); Cell cell = BufferCell.expiring(hintColumn, now, ttl, FBUtilities.nowInSeconds(), value); - return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeBackedRow.singleCellRow(clustering, cell))); + return new Mutation(PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, key, BTreeRow.singleCellRow(clustering, cell))); } /* @@ -193,7 +193,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean private static void deleteHint(ByteBuffer tokenBytes, Clustering clustering, long timestamp) { Cell cell = BufferCell.tombstone(hintColumn, timestamp, FBUtilities.nowInSeconds()); - PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeBackedRow.singleCellRow(clustering, cell)); + PartitionUpdate upd = PartitionUpdate.singleRowUpdate(SystemKeyspace.Hints, tokenBytes, BTreeRow.singleCellRow(clustering, cell)); new Mutation(upd).applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index fb896f5..3900d96 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -319,7 +319,7 @@ public abstract class LegacyLayout { // we need to extract the range tombstone so materialize the partition. Since this is // used for the on-wire format, this is not worst than it used to be. - final ArrayBackedPartition partition = ArrayBackedPartition.create(iterator); + final ImmutableBTreePartition partition = ImmutableBTreePartition.create(iterator); DeletionInfo info = partition.deletionInfo(); Pair<LegacyRangeTombstoneList, Iterator<LegacyCell>> pair = fromRowIterator(partition.metadata(), partition.iterator(), partition.staticRow()); @@ -538,7 +538,7 @@ public abstract class LegacyLayout for (ColumnDefinition column : statics) columnsToFetch.add(column.name.bytes); - Row.Builder builder = BTreeBackedRow.unsortedBuilder(statics, FBUtilities.nowInSeconds()); + Row.Builder builder = BTreeRow.unsortedBuilder(statics, FBUtilities.nowInSeconds()); builder.newRow(Clustering.STATIC_CLUSTERING); boolean foundOne = false; @@ -1058,8 +1058,7 @@ public abstract class LegacyLayout // We cannot use a sorted builder because we don't have exactly the same ordering in 3.0 and pre-3.0. More precisely, within a row, we // store all simple columns before the complex ones in 3.0, which we use to sort everything sorted by the column name before. Note however // that the unsorted builder won't have to reconcile cells, so the exact value we pass for nowInSec doesn't matter. - this.builder = BTreeBackedRow.unsortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars, FBUtilities.nowInSeconds()); - + this.builder = BTreeRow.unsortedBuilder(isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars, FBUtilities.nowInSeconds()); } public static CellGrouper staticGrouper(CFMetaData metadata, SerializationHelper helper) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index a66cb6a..5f40210 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -214,9 +214,9 @@ public abstract class ReadResponse */ private static class LegacyRemoteDataResponse extends ReadResponse { - private final List<ArrayBackedPartition> partitions; + private final List<ImmutableBTreePartition> partitions; - private LegacyRemoteDataResponse(List<ArrayBackedPartition> partitions) + private LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions) { super(null); // we never serialize LegacyRemoteDataResponses, so we don't care about the metadata this.partitions = partitions; @@ -245,7 +245,7 @@ public abstract class ReadResponse public UnfilteredRowIterator next() { - ArrayBackedPartition partition = partitions.get(idx++); + ImmutableBTreePartition partition = partitions.get(idx++); ClusteringIndexFilter filter = command.clusteringIndexFilter(partition.partitionKey()); @@ -340,7 +340,7 @@ public abstract class ReadResponse try { - return new LegacyRemoteDataResponse(Collections.singletonList(ArrayBackedPartition.create(rowIterator))); + return new LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator))); } finally { @@ -440,13 +440,13 @@ public abstract class ReadResponse { // Contrarily to serialize, we have to read the number of serialized partitions here. int partitionCount = in.readInt(); - ArrayList<ArrayBackedPartition> partitions = new ArrayList<>(partitionCount); + ArrayList<ImmutableBTreePartition> partitions = new ArrayList<>(partitionCount); for (int i = 0; i < partitionCount; i++) { ByteBuffer key = ByteBufferUtil.readWithShortLength(in); try (UnfilteredRowIterator partition = LegacyLayout.deserializeLegacyPartition(in, version, SerializationHelper.Flag.FROM_REMOTE, key)) { - partitions.add(ArrayBackedPartition.create(partition)); + partitions.add(ImmutableBTreePartition.create(partition)); } } return new LegacyRemoteDataResponse(partitions); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index 372ba04..f1d17b6 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -31,7 +31,6 @@ import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; /** @@ -80,7 +79,7 @@ public class RowUpdateBuilder assert staticBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; assert regularBuilder == null : "Cannot add the clustering twice to the same row"; - regularBuilder = BTreeBackedRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds()); + regularBuilder = BTreeRow.unsortedBuilder(update.columns().regulars, FBUtilities.nowInSeconds()); regularBuilder.newRow(clustering); // If a CQL table, add the "row marker" @@ -105,7 +104,7 @@ public class RowUpdateBuilder assert regularBuilder == null : "Cannot update both static and non-static columns with the same RowUpdateBuilder object"; if (staticBuilder == null) { - staticBuilder = BTreeBackedRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds()); + staticBuilder = BTreeRow.unsortedBuilder(update.columns().statics, FBUtilities.nowInSeconds()); staticBuilder.newRow(Clustering.STATIC_CLUSTERING); } return staticBuilder; @@ -186,7 +185,7 @@ public class RowUpdateBuilder assert clusteringValues.length == update.metadata().comparator.size() || (clusteringValues.length == 0 && !update.columns().statics.isEmpty()); boolean isStatic = clusteringValues.length != update.metadata().comparator.size(); - Row.Builder builder = BTreeBackedRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars); + Row.Builder builder = BTreeRow.sortedBuilder(isStatic ? update.columns().statics : update.columns().regulars); if (isStatic) builder.newRow(Clustering.STATIC_CLUSTERING); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index 518e299..f40da5b 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -96,7 +96,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); - ArrayBackedPartition result = null; + ImmutableBTreePartition result = null; ClusteringIndexNamesFilter filter = clusteringIndexFilter(); Tracing.trace("Merging memtable contents"); @@ -182,18 +182,18 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus return result.unfilteredIterator(columnFilter(), Slices.ALL, clusteringIndexFilter().isReversed()); } - private ArrayBackedPartition add(UnfilteredRowIterator iter, ArrayBackedPartition result, boolean isRepaired) + private ImmutableBTreePartition add(UnfilteredRowIterator iter, ImmutableBTreePartition result, boolean isRepaired) { if (!isRepaired) oldestUnrepairedDeletionTime = Math.min(oldestUnrepairedDeletionTime, iter.stats().minLocalDeletionTime); int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 1); if (result == null) - return ArrayBackedPartition.create(iter, maxRows); + return ImmutableBTreePartition.create(iter, maxRows); try (UnfilteredRowIterator merged = UnfilteredRowIterators.merge(Arrays.asList(iter, result.unfilteredIterator(columnFilter(), Slices.ALL, false)), nowInSec())) { - return ArrayBackedPartition.create(merged, maxRows); + return ImmutableBTreePartition.create(merged, maxRows); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 1b688c9..d9b0e2b 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -317,7 +317,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter try { // We want to cache only rowsToCache rows - CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); if (sentinelSuccess && !toCache.isEmpty()) { Tracing.trace("Caching {} rows", toCache.rowCount()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index c00597a..2f75f34 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -121,7 +121,7 @@ public abstract class UnfilteredDeserializer super(metadata, in, helper); this.header = header; this.clusteringDeserializer = new ClusteringPrefix.Deserializer(metadata.comparator, in, header); - this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header)); + this.builder = BTreeRow.sortedBuilder(helper.fetchedRegularColumns(header)); } public boolean hasNext() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index f4acd6f..4d2e294 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -23,10 +23,11 @@ import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; +import org.apache.cassandra.utils.btree.BTree; /** * A Cell Iterator in reversed clustering order over SSTable @@ -123,7 +124,7 @@ public class SSTableReversedIterator extends AbstractSSTableIterator protected void setIterator(Slice slice) { assert buffer != null; - iterator = buffer.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + iterator = buffer.built.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); } protected boolean hasNextInternal() throws IOException @@ -303,56 +304,49 @@ public class SSTableReversedIterator extends AbstractSSTableIterator } } - private class ReusablePartitionData extends AbstractThreadUnsafePartition + private class ReusablePartitionData { + private final CFMetaData metadata; + private final DecoratedKey partitionKey; + private final PartitionColumns columns; + private MutableDeletionInfo.Builder deletionBuilder; private MutableDeletionInfo deletionInfo; + private BTree.Builder<Row> rowBuilder; + private ImmutableBTreePartition built; private ReusablePartitionData(CFMetaData metadata, DecoratedKey partitionKey, PartitionColumns columns, int initialRowCapacity) { - super(metadata, partitionKey, columns, new ArrayList<>(initialRowCapacity)); - } - - public DeletionInfo deletionInfo() - { - return deletionInfo; + this.metadata = metadata; + this.partitionKey = partitionKey; + this.columns = columns; + this.rowBuilder = BTree.builder(metadata.comparator, initialRowCapacity); } - protected boolean canHaveShadowedData() - { - return false; - } - - public Row staticRow() - { - return Rows.EMPTY_STATIC_ROW; // we don't actually use that - } - - public EncodingStats stats() - { - return EncodingStats.NO_STATS; // we don't actually use that - } public void add(Unfiltered unfiltered) { if (unfiltered.isRow()) - rows.add((Row)unfiltered); + rowBuilder.add((Row)unfiltered); else deletionBuilder.add((RangeTombstoneMarker)unfiltered); } public void reset() { - rows.clear(); + built = null; + rowBuilder.reuse(); deletionBuilder = MutableDeletionInfo.builder(partitionLevelDeletion, metadata().comparator, false); } public void build() { deletionInfo = deletionBuilder.build(); + built = new ImmutableBTreePartition(metadata, partitionKey, columns, Rows.EMPTY_STATIC_ROW, rowBuilder.build(), + DeletionInfo.LIVE, EncodingStats.NO_STATS); deletionBuilder = null; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index f9e9e71..747b956 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -385,7 +385,7 @@ public class Scrubber implements Closeable { // TODO bitch if the row is too large? if it is there's not much we can do ... outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey)); - outOfOrder.add(ArrayBackedPartition.create(iterator)); + outOfOrder.add(ImmutableBTreePartition.create(iterator)); } private void throwIfFatal(Throwable th) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 881e154..a5212aa 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -221,7 +221,7 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> // satisfied, which forces us to materialize the result (in theory we could materialize only // what we need which might or might not be everything, but we keep it simple since in practice // it's not worth that it has ever been). - ArrayBackedPartition result = ArrayBackedPartition.create(iter); + ImmutableBTreePartition result = ImmutableBTreePartition.create(iter); // The partition needs to have a row for every expression, and the expression needs to be valid. for (Expression expr : expressions) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java index b5ed7f6..eccd7e2 100644 --- a/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java +++ b/src/java/org/apache/cassandra/db/index/AbstractSimplePerColumnSecondaryIndex.java @@ -116,7 +116,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cellValue, path)); - Row row = BTreeBackedRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion); + Row row = BTreeRow.emptyDeletedRow(makeIndexClustering(rowKey, clustering, path), deletion); PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null); @@ -133,7 +133,7 @@ public abstract class AbstractSimplePerColumnSecondaryIndex extends PerColumnSec { DecoratedKey valueKey = getIndexKeyFor(getIndexedValue(rowKey, clustering, cell)); - Row row = BTreeBackedRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info); + Row row = BTreeRow.noCellLiveRow(makeIndexClustering(rowKey, clustering, cell), info); PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, valueKey, row); if (logger.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java index 29f235c..df45ea2 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndex.java @@ -112,7 +112,7 @@ public abstract class CompositesIndex extends AbstractSimplePerColumnSecondaryIn public void delete(IndexedEntry entry, OpOrder.Group opGroup, int nowInSec) { - Row row = BTreeBackedRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec)); + Row row = BTreeRow.emptyDeletedRow(entry.indexClustering, new DeletionTime(entry.timestamp, nowInSec)); PartitionUpdate upd = PartitionUpdate.singleRowUpdate(indexCfs.metadata, entry.indexValue, row); indexCfs.apply(upd, SecondaryIndexManager.nullUpdater, opGroup, null); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 53a9b4a..4b70dcf 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -142,7 +142,7 @@ public class KeysSearcher extends SecondaryIndexSearcher { // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering // is the indexed name. Ans so we need to materialize the partition. - ArrayBackedPartition result = ArrayBackedPartition.create(iterator); + ImmutableBTreePartition result = ImmutableBTreePartition.create(iterator); iterator.close(); Row data = result.getRow(new Clustering(index.indexedColumn().name.bytes)); Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java new file mode 100644 index 0000000..41015b0 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -0,0 +1,403 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.partitions; + +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; + +import static org.apache.cassandra.utils.btree.BTree.Dir.desc; + +public abstract class AbstractBTreePartition implements Partition, Iterable<Row> +{ + protected static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + + protected final CFMetaData metadata; + protected final DecoratedKey partitionKey; + protected final PartitionColumns columns; + protected abstract Holder holder(); + protected abstract boolean canHaveShadowedData(); + + protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey partitionKey, PartitionColumns columns) + { + this.metadata = metadata; + this.partitionKey = partitionKey; + this.columns = columns; + } + + protected static final class Holder + { + final DeletionInfo deletionInfo; + // the btree of rows + final Object[] tree; + final Row staticRow; + final EncodingStats stats; + + Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats) + { + this.tree = tree; + this.deletionInfo = deletionInfo; + this.staticRow = staticRow; + this.stats = stats; + } + } + + public DeletionInfo deletionInfo() + { + return holder().deletionInfo; + } + + public Row staticRow() + { + return holder().staticRow; + } + + public boolean isEmpty() + { + Holder holder = holder(); + return holder.deletionInfo.isLive() && BTree.isEmpty(holder.tree) && holder.staticRow.isEmpty(); + } + + public boolean hasRows() + { + Holder holder = holder(); + return !BTree.isEmpty(holder.tree); + } + + public CFMetaData metadata() + { + return metadata; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + return holder().deletionInfo.getPartitionDeletion(); + } + + public PartitionColumns columns() + { + // We don't really know which columns will be part of the update, so assume it's all of them + return metadata.partitionColumns(); + } + + public EncodingStats stats() + { + return holder().stats; + } + + public Row getRow(Clustering clustering) + { + Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); + // Note that for statics, this will never return null, this will return an empty row. However, + // it's more consistent for this method to return null if we don't really have a static row. + return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; + } + + private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow) + { + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive())) + return Rows.EMPTY_STATIC_ROW; + + Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata); + return row == null ? Rows.EMPTY_STATIC_ROW : row; + } + + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed) + { + // TODO: we could optimize comparison for "NativeRow" Ã la #6755 + final Holder current = holder(); + return new SearchIterator<Clustering, Row>() + { + private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed)); + private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + + public boolean hasNext() + { + return rawIter.hasNext(); + } + + public Row next(Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return staticRow(current, columns, true); + + Row row = rawIter.next(clustering); + RangeTombstone rt = current.deletionInfo.rangeCovering(clustering); + + // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row + // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion + // to carry the proper deletion on the row. + DeletionTime activeDeletion = partitionDeletion; + if (rt != null && rt.deletionTime().supersedes(activeDeletion)) + activeDeletion = rt.deletionTime(); + + if (row == null) + return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, activeDeletion); + + return row.filter(columns, activeDeletion, true, metadata); + } + }; + } + + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + { + return unfilteredIterator(holder(), selection, slices, reversed); + } + + public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed) + { + Row staticRow = staticRow(current, selection, false); + if (slices.size() == 0) + { + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed); + } + + return slices.size() == 1 + ? sliceIterator(selection, slices.get(0), reversed, current, staticRow) + : new SlicesIterator(selection, slices, reversed, current, staticRow); + } + + private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow) + { + Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); + Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); + Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed)); + Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed); + + return merge(rowIter, deleteIter, selection, reversed, current, staticRow); + } + + private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter, + ColumnFilter selection, boolean reversed, Holder current, Row staticRow) + { + return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(), + selection, staticRow, reversed, current.stats, + rowIter, deleteIter, + canHaveShadowedData()); + } + + private abstract class AbstractIterator extends AbstractUnfilteredRowIterator + { + final Holder current; + final ColumnFilter selection; + + private AbstractIterator(ColumnFilter selection, boolean isReversed) + { + this(AbstractBTreePartition.this.holder(), selection, isReversed); + } + + private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed) + { + this(current, + AbstractBTreePartition.this.staticRow(current, selection, false), + selection, isReversed); + } + + private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed) + { + super(AbstractBTreePartition.this.metadata, + AbstractBTreePartition.this.partitionKey, + current.deletionInfo.getPartitionDeletion(), + AbstractBTreePartition.this.columns, + staticRow, + isReversed, + current.stats); + this.current = current; + this.selection = selection; + } + } + + public class SlicesIterator extends AbstractIterator + { + private final Slices slices; + + private int idx; + private Iterator<Unfiltered> currentSlice; + + private SlicesIterator(ColumnFilter selection, + Slices slices, + boolean isReversed, + Holder current, + Row staticRow) + { + super(current, staticRow, selection, isReversed); + this.slices = slices; + } + + protected Unfiltered computeNext() + { + while (true) + { + if (currentSlice == null) + { + if (idx >= slices.size()) + return endOfData(); + + int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx; + currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW); + idx++; + } + + if (currentSlice.hasNext()) + return currentSlice.next(); + + currentSlice = null; + } + } + } + + public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator + { + private Iterator<Unfiltered> iterator; + + protected SliceableIterator(ColumnFilter selection, boolean isReversed) + { + super(selection, isReversed); + } + + protected Unfiltered computeNext() + { + if (iterator == null) + iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder); + if (!iterator.hasNext()) + return endOfData(); + return iterator.next(); + } + + public Iterator<Unfiltered> slice(Slice slice) + { + return sliceIterator(selection, slice, isReverseOrder, current, staticRow); + } + } + + public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) + { + return new SliceableIterator(columns, reversed); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() + { + return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false); + } + + protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity) + { + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); + + BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity); + builder.auto(false); + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + builder.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + + if (reversed) + builder.reverse(); + + return new Holder(builder.build(), deletionBuilder.build(), iterator.staticRow(), iterator.stats()); + } + + // live must (as the name suggests) not contain any deletion information + protected static Holder build(RowIterator rows, DeletionInfo live, boolean buildEncodingStats, int initialRowCapacity) + { + CFMetaData metadata = rows.metadata(); + boolean reversed = rows.isReverseOrder(); + + BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity); + builder.auto(false); + while (rows.hasNext()) + { + Row row = rows.next(); + builder.add(row); + } + + if (reversed) + builder.reverse(); + + Row staticRow = rows.staticRow(); + Object[] tree = builder.build(); + EncodingStats stats = buildEncodingStats ? EncodingStats.Collector.collect(staticRow, BTree.iterator(tree), live) + : EncodingStats.NO_STATS; + return new Holder(tree, live, staticRow, stats); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + sb.append(String.format("[%s.%s] key=%s columns=%s", + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(partitionKey().getKey()), + columns)); + + if (staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(staticRow().toString(metadata)); + + for (Row row : this) + sb.append("\n ").append(row.toString(metadata)); + + return sb.toString(); + } + + public int rowCount() + { + return BTree.size(holder().tree); + } + + public Iterator<Row> iterator() + { + return BTree.<Row>iterator(holder().tree); + } + + public Row lastRow() + { + Object[] tree = holder().tree; + if (BTree.isEmpty(tree)) + return null; + + return BTree.findByIndex(tree, BTree.size(tree) - 1); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java deleted file mode 100644 index 0b218f5..0000000 --- a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import java.util.*; - -import com.google.common.collect.Lists; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.utils.SearchIterator; - -/** - * Abstract common class for all non-thread safe Partition implementations. - */ -public abstract class AbstractThreadUnsafePartition implements Partition, Iterable<Row> -{ - protected final CFMetaData metadata; - protected final DecoratedKey key; - - protected final PartitionColumns columns; - - protected final List<Row> rows; - - protected AbstractThreadUnsafePartition(CFMetaData metadata, - DecoratedKey key, - PartitionColumns columns, - List<Row> rows) - { - this.metadata = metadata; - this.key = key; - this.columns = columns; - this.rows = rows; - } - - public CFMetaData metadata() - { - return metadata; - } - - public DecoratedKey partitionKey() - { - return key; - } - - public DeletionTime partitionLevelDeletion() - { - return deletionInfo().getPartitionDeletion(); - } - - public PartitionColumns columns() - { - return columns; - } - - public abstract Row staticRow(); - - protected abstract boolean canHaveShadowedData(); - - /** - * The deletion info for the partition update. - * - * Note: do not cast the result to a {@code MutableDeletionInfo} to modify it! - * - * @return the deletion info for the partition update for use as read-only. - */ - public abstract DeletionInfo deletionInfo(); - - public int rowCount() - { - return rows.size(); - } - - public boolean isEmpty() - { - return deletionInfo().isLive() && rows.isEmpty() && staticRow().isEmpty(); - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(); - CFMetaData metadata = metadata(); - sb.append(String.format("Partition[%s.%s] key=%s columns=%s%s", - metadata().ksName, - metadata().cfName, - metadata().getKeyValidator().getString(partitionKey().getKey()), - columns(), - deletionInfo().isLive() ? "" : " " + deletionInfo())); - - if (staticRow() != Rows.EMPTY_STATIC_ROW) - sb.append("\n ").append(staticRow().toString(metadata, true)); - - // We use createRowIterator() directly instead of iterator() because that avoids - // sorting for PartitionUpdate (which inherit this method) and that is useful because - // 1) it can help with debugging and 2) we can't write after sorting but we want to - // be able to print an update while we build it (again for debugging) - for (Row row : this) - sb.append("\n ").append(row.toString(metadata, true)); - - return sb.toString(); - } - - public Row getRow(Clustering clustering) - { - Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); - // Note that for statics, this will never return null, this will return an empty row. However, - // it's more consistent for this method to return null if we don't really have a static row. - return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; - } - - /** - * Returns an iterator that iterators over the rows of this update in clustering order. - * - * @return an iterator over the rows of this partition. - */ - public Iterator<Row> iterator() - { - return rows.iterator(); - } - - public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed) - { - final RowSearcher searcher = reversed ? new ReverseRowSearcher() : new ForwardRowSearcher(); - return new SearchIterator<Clustering, Row>() - { - public boolean hasNext() - { - return !searcher.isDone(); - } - - public Row next(Clustering clustering) - { - if (clustering == Clustering.STATIC_CLUSTERING) - { - Row staticRow = staticRow(); - return staticRow.isEmpty() || columns.fetchedColumns().statics.isEmpty() - ? Rows.EMPTY_STATIC_ROW - : staticRow.filter(columns, partitionLevelDeletion(), true, metadata); - } - - Row row = searcher.search(clustering); - RangeTombstone rt = deletionInfo().rangeCovering(clustering); - - // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row - // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion - // to carry the proper deletion on the row. - DeletionTime activeDeletion = partitionLevelDeletion(); - if (rt != null && rt.deletionTime().supersedes(activeDeletion)) - activeDeletion = rt.deletionTime(); - - if (row == null) - return activeDeletion.isLive() ? null : BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion); - - return row.filter(columns, activeDeletion, true, metadata); - } - }; - } - - public UnfilteredRowIterator unfilteredIterator() - { - return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); - } - - public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed) - { - return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed)); - } - - protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() - { - return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false); - } - - public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed) - { - return new SliceableIterator(this, selection, reversed); - } - - /** - * Simple binary search for a given row (in the rows list). - * - * The return value has the exact same meaning that the one of Collections.binarySearch() but - * we don't use the later because we're searching for a 'Clustering' in an array of 'Row' (and while - * both are Clusterable, it's slightly faster to use the 'Clustering' comparison (see comment on - * ClusteringComparator.rowComparator())). - */ - private int binarySearch(Clustering clustering, int fromIndex, int toIndex) - { - ClusteringComparator comparator = metadata().comparator; - int low = fromIndex; - int mid = toIndex; - int high = mid - 1; - int result = -1; - while (low <= high) - { - mid = (low + high) >> 1; - if ((result = comparator.compare(clustering, rows.get(mid).clustering())) > 0) - low = mid + 1; - else if (result == 0) - return mid; - else - high = mid - 1; - } - return -mid - (result < 0 ? 1 : 2); - } - - private class SliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator - { - private final ColumnFilter columns; - private RowSearcher searcher; - - private Iterator<Unfiltered> iterator; - - private SliceableIterator(AbstractThreadUnsafePartition partition, ColumnFilter columns, boolean isReverseOrder) - { - super(partition.metadata(), - partition.partitionKey(), - partition.partitionLevelDeletion(), - columns.fetchedColumns(), - partition.staticRow().isEmpty() ? Rows.EMPTY_STATIC_ROW : partition.staticRow().filter(columns, partition.partitionLevelDeletion(), false, partition.metadata()), - isReverseOrder, - partition.stats()); - this.columns = columns; - } - - protected Unfiltered computeNext() - { - if (iterator == null) - iterator = merge(isReverseOrder ? Lists.reverse(rows).iterator(): iterator(), deletionInfo().rangeIterator(isReverseOrder())); - - return iterator.hasNext() ? iterator.next() : endOfData(); - } - - public Iterator<Unfiltered> slice(Slice slice) - { - if (searcher == null) - searcher = isReverseOrder() ? new ReverseRowSearcher() : new ForwardRowSearcher(); - return merge(searcher.slice(slice), deletionInfo().rangeIterator(slice, isReverseOrder())); - } - - private Iterator<Unfiltered> merge(Iterator<Row> rows, Iterator<RangeTombstone> ranges) - { - return new RowAndDeletionMergeIterator(metadata, - partitionKey, - partitionLevelDeletion, - columns, - staticRow(), - isReverseOrder(), - stats(), - rows, - ranges, - canHaveShadowedData()); - } - } - - /** - * Utility class to search for rows or slice of rows in order. - */ - private abstract class RowSearcher - { - public abstract boolean isDone(); - - public abstract Row search(Clustering name); - - public abstract Iterator<Row> slice(Slice slice); - - protected int search(Clustering clustering, int from, int to) - { - return binarySearch(clustering, from, to); - } - - protected int search(Slice.Bound bound, int from, int to) - { - return Collections.binarySearch(rows.subList(from, to), bound, metadata.comparator); - } - } - - private class ForwardRowSearcher extends RowSearcher - { - private int nextIdx = 0; - - public boolean isDone() - { - return nextIdx >= rows.size(); - } - - public Row search(Clustering name) - { - if (isDone()) - return null; - - int idx = search(name, nextIdx, rows.size()); - if (idx < 0) - { - nextIdx = -idx - 1; - return null; - } - else - { - nextIdx = idx + 1; - return rows.get(idx); - } - } - - public Iterator<Row> slice(Slice slice) - { - // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will - // be a match, so we save from testing for it. - - // since the binary search starts from nextIdx, the position returned will be an offset from nextIdx; to - // get an absolute position, add nextIdx back in - int searchResult = search(slice.start(), nextIdx, rows.size()); - final int start = nextIdx + (-searchResult - 1); // First index to include - - if (start >= rows.size()) - return Collections.emptyIterator(); - - // similarly, add start to the returned position - searchResult = search(slice.end(), start, rows.size()); - final int end = start + (-searchResult - 1); // First index to exclude - - // Remember the end to speed up potential further slice search - nextIdx = end; - - if (start >= end) - return Collections.emptyIterator(); - - return rows.subList(start, end).iterator(); - } - } - - private class ReverseRowSearcher extends RowSearcher - { - private int nextIdx = rows.size() - 1; - - public boolean isDone() - { - return nextIdx < 0; - } - - public Row search(Clustering name) - { - if (isDone()) - return null; - - int idx = search(name, 0, nextIdx); - if (idx < 0) - { - // The insertion point is the first element greater than name, so we want start from the previous one next time - nextIdx = -idx - 2; - return null; - } - else - { - nextIdx = idx - 1; - return rows.get(idx); - } - } - - public Iterator<Row> slice(Slice slice) - { - // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will - // be a match, so we save from testing for it. - - // The insertion point is the first element greater than slice.end(), so we want the previous index - final int start = -search(slice.end(), 0, nextIdx + 1) - 2; // First index to include - if (start < 0) - return Collections.emptyIterator(); - - final int end = -search(slice.start(), 0, start + 1) - 2; // First index to exclude - - // Remember the end to speed up potential further slice search - nextIdx = end; - - if (start < end) - return Collections.emptyIterator(); - - return Lists.reverse(rows.subList(end+1, start+1)).iterator(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java deleted file mode 100644 index fab8591..0000000 --- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java +++ /dev/null @@ -1,294 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import java.io.IOException; -import java.util.*; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.DataLimits; -import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; - -public class ArrayBackedCachedPartition extends ArrayBackedPartition implements CachedPartition -{ - private final int createdAtInSec; - - private final int cachedLiveRows; - private final int rowsWithNonExpiringCells; - - private final int nonTombstoneCellCount; - private final int nonExpiringLiveCells; - - private ArrayBackedCachedPartition(CFMetaData metadata, - DecoratedKey partitionKey, - PartitionColumns columns, - Row staticRow, - List<Row> rows, - DeletionInfo deletionInfo, - EncodingStats stats, - int createdAtInSec, - int cachedLiveRows, - int rowsWithNonExpiringCells, - int nonTombstoneCellCount, - int nonExpiringLiveCells) - { - super(metadata, partitionKey, columns, staticRow, rows, deletionInfo, stats); - this.createdAtInSec = createdAtInSec; - this.cachedLiveRows = cachedLiveRows; - this.rowsWithNonExpiringCells = rowsWithNonExpiringCells; - this.nonTombstoneCellCount = nonTombstoneCellCount; - this.nonExpiringLiveCells = nonExpiringLiveCells; - } - - /** - * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. - * - * Warning: Note that this method does not close the provided iterator and it is - * up to the caller to do so. - * - * @param iterator the iterator got gather in memory. - * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. - * @return the created partition. - */ - public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec) - { - return create(iterator, 16, nowInSec); - } - - /** - * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. - * - * Warning: Note that this method does not close the provided iterator and it is - * up to the caller to do so. - * - * @param iterator the iterator got gather in memory. - * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally - * correspond or be a good estimation of the number or rows in {@code iterator}. - * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. - * @return the created partition. - */ - public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec) - { - CFMetaData metadata = iterator.metadata(); - boolean reversed = iterator.isReverseOrder(); - - List<Row> rows = new ArrayList<>(initialRowCapacity); - MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); - - int cachedLiveRows = 0; - int rowsWithNonExpiringCells = 0; - - int nonTombstoneCellCount = 0; - int nonExpiringLiveCells = 0; - - while (iterator.hasNext()) - { - Unfiltered unfiltered = iterator.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - { - Row row = (Row)unfiltered; - rows.add(row); - - // Collect stats - if (row.hasLiveData(nowInSec)) - ++cachedLiveRows; - - boolean hasNonExpiringCell = false; - for (Cell cell : row.cells()) - { - if (!cell.isTombstone()) - { - ++nonTombstoneCellCount; - if (!cell.isExpiring()) - { - hasNonExpiringCell = true; - ++nonExpiringLiveCells; - } - } - } - - if (hasNonExpiringCell) - ++rowsWithNonExpiringCells; - } - else - { - deletionBuilder.add((RangeTombstoneMarker)unfiltered); - } - } - - if (reversed) - Collections.reverse(rows); - - return new ArrayBackedCachedPartition(metadata, - iterator.partitionKey(), - iterator.columns(), - iterator.staticRow(), - rows, - deletionBuilder.build(), - iterator.stats(), - nowInSec, - cachedLiveRows, - rowsWithNonExpiringCells, - nonTombstoneCellCount, - nonExpiringLiveCells); - } - - public Row lastRow() - { - if (rows.isEmpty()) - return null; - - return rows.get(rows.size() - 1); - } - - /** - * The number of rows that were live at the time the partition was cached. - * - * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this. - * - * @return the number of rows in this partition that were live at the time the - * partition was cached (this can be different from the number of live rows now - * due to expiring cells). - */ - public int cachedLiveRows() - { - return cachedLiveRows; - } - - /** - * The number of rows in this cached partition that have at least one non-expiring - * non-deleted cell. - * - * Note that this is generally not a very meaningful number, but this is used by - * {@link DataLimits#hasEnoughLiveData} as an optimization. - * - * @return the number of row that have at least one non-expiring non-deleted cell. - */ - public int rowsWithNonExpiringCells() - { - return rowsWithNonExpiringCells; - } - - public int nonTombstoneCellCount() - { - return nonTombstoneCellCount; - } - - public int nonExpiringLiveCells() - { - return nonExpiringLiveCells; - } - - static class Serializer implements ISerializer<CachedPartition> - { - public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException - { - int version = MessagingService.current_version; - - assert partition instanceof ArrayBackedCachedPartition; - ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition; - - out.writeInt(p.createdAtInSec); - out.writeInt(p.cachedLiveRows); - out.writeInt(p.rowsWithNonExpiringCells); - out.writeInt(p.nonTombstoneCellCount); - out.writeInt(p.nonExpiringLiveCells); - CFMetaData.serializer.serialize(partition.metadata(), out, version); - try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) - { - UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, p.rowCount()); - } - } - - public CachedPartition deserialize(DataInputPlus in) throws IOException - { - int version = MessagingService.current_version; - - // Note that it would be slightly simpler to just do - // ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...)); - // However deserializing the header separatly is not a lot harder and allows us to: - // 1) get the capacity of the partition so we can size it properly directly - // 2) saves the creation of a temporary iterator: rows are directly written to the partition, which - // is slightly faster. - - int createdAtInSec = in.readInt(); - int cachedLiveRows = in.readInt(); - int rowsWithNonExpiringCells = in.readInt(); - int nonTombstoneCellCount = in.readInt(); - int nonExpiringLiveCells = in.readInt(); - - CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); - UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL); - assert !header.isReversed && header.rowEstimate >= 0; - - MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, false); - List<Row> rows = new ArrayList<>(header.rowEstimate); - - try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, SerializationHelper.Flag.LOCAL, header)) - { - while (partition.hasNext()) - { - Unfiltered unfiltered = partition.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - rows.add((Row)unfiltered); - else - deletionBuilder.add((RangeTombstoneMarker)unfiltered); - } - } - - return new ArrayBackedCachedPartition(metadata, - header.key, - header.sHeader.columns(), - header.staticRow, - rows, - deletionBuilder.build(), - header.sHeader.stats(), - createdAtInSec, - cachedLiveRows, - rowsWithNonExpiringCells, - nonTombstoneCellCount, - nonExpiringLiveCells); - - } - - public long serializedSize(CachedPartition partition) - { - int version = MessagingService.current_version; - - assert partition instanceof ArrayBackedCachedPartition; - ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition; - - try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) - { - return TypeSizes.sizeof(p.createdAtInSec) - + TypeSizes.sizeof(p.cachedLiveRows) - + TypeSizes.sizeof(p.rowsWithNonExpiringCells) - + TypeSizes.sizeof(p.nonTombstoneCellCount) - + TypeSizes.sizeof(p.nonExpiringLiveCells) - + CFMetaData.serializer.serializedSize(partition.metadata(), version) - + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, MessagingService.current_version, p.rowCount()); - } - } - } -} - http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java deleted file mode 100644 index 79c65dc..0000000 --- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import java.util.*; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; - -public class ArrayBackedPartition extends AbstractThreadUnsafePartition -{ - private final Row staticRow; - private final DeletionInfo deletionInfo; - private final EncodingStats stats; - - protected ArrayBackedPartition(CFMetaData metadata, - DecoratedKey partitionKey, - PartitionColumns columns, - Row staticRow, - List<Row> rows, - DeletionInfo deletionInfo, - EncodingStats stats) - { - super(metadata, partitionKey, columns, rows); - this.staticRow = staticRow; - this.deletionInfo = deletionInfo; - this.stats = stats; - } - - /** - * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator. - * - * Warning: Note that this method does not close the provided iterator and it is - * up to the caller to do so. - * - * @param iterator the iterator to gather in memory. - * @return the created partition. - */ - public static ArrayBackedPartition create(UnfilteredRowIterator iterator) - { - return create(iterator, 16); - } - - /** - * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator. - * - * Warning: Note that this method does not close the provided iterator and it is - * up to the caller to do so. - * - * @param iterator the iterator to gather in memory. - * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally - * correspond or be a good estimation of the number or rows in {@code iterator}. - * @return the created partition. - */ - public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity) - { - CFMetaData metadata = iterator.metadata(); - boolean reversed = iterator.isReverseOrder(); - - List<Row> rows = new ArrayList<>(initialRowCapacity); - MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); - - while (iterator.hasNext()) - { - Unfiltered unfiltered = iterator.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - rows.add((Row)unfiltered); - else - deletionBuilder.add((RangeTombstoneMarker)unfiltered); - } - - if (reversed) - Collections.reverse(rows); - - return new ArrayBackedPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), iterator.stats()); - } - - protected boolean canHaveShadowedData() - { - // We only create instances from UnfilteredRowIterator that don't have shadowed data - return false; - } - - public Row staticRow() - { - return staticRow; - } - - public DeletionInfo deletionInfo() - { - return deletionInfo; - } - - public EncodingStats stats() - { - return stats; - } -}
