This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 8576e76 Minimize BTree iterator allocations 8576e76 is described below commit 8576e769d13b3e887ea604074641fd4c42af5e8a Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Tue Oct 15 20:10:42 2019 -0700 Minimize BTree iterator allocations Patch by Blake Eggleston; Reviewed by Benedict Elliott Smith for CASSANDRA-15389 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ColumnIndex.java | 6 +- src/java/org/apache/cassandra/db/Columns.java | 9 +- src/java/org/apache/cassandra/db/Mutation.java | 6 +- src/java/org/apache/cassandra/db/ReadResponse.java | 8 +- .../apache/cassandra/db/SerializationHeader.java | 1 + .../cassandra/db/UnfilteredDeserializer.java | 6 +- .../db/columniterator/AbstractSSTableIterator.java | 6 +- .../cassandra/db/commitlog/CommitLogReader.java | 4 +- .../db/partitions/CachedBTreePartition.java | 4 +- .../cassandra/db/partitions/PartitionUpdate.java | 7 +- .../partitions/UnfilteredPartitionIterators.java | 2 +- .../org/apache/cassandra/db/rows/AbstractRow.java | 18 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 103 +++++++---- src/java/org/apache/cassandra/db/rows/Cell.java | 2 +- .../org/apache/cassandra/db/rows/ColumnData.java | 5 + .../cassandra/db/rows/ComplexColumnData.java | 18 +- ...ationHelper.java => DeserializationHelper.java} | 6 +- src/java/org/apache/cassandra/db/rows/Row.java | 23 ++- src/java/org/apache/cassandra/db/rows/Rows.java | 72 ++++--- .../cassandra/db/rows/SerializationHelper.java | 134 +++----------- .../db/rows/UnfilteredRowIteratorSerializer.java | 21 ++- .../cassandra/db/rows/UnfilteredSerializer.java | 95 +++++----- .../db/streaming/CassandraStreamReader.java | 4 +- .../io/sstable/SSTableIdentityIterator.java | 4 +- .../io/sstable/SSTableSimpleIterator.java | 12 +- .../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +- .../org/apache/cassandra/service/paxos/Commit.java | 2 +- .../{WrappedInt.java => BiLongAccumulator.java} | 32 +--- .../{WrappedInt.java => LongAccumulator.java} | 32 +--- .../org/apache/cassandra/utils/btree/BTree.java | 206 ++++++++++++--------- .../cassandra/utils/btree/BTreeSearchIterator.java | 4 + .../utils/btree/LeafBTreeSearchIterator.java | 29 ++- .../db/commitlog/CommitLogStressTest.java | 5 +- .../org/apache/cassandra/db/ReadCommandTest.java | 3 +- .../org/apache/cassandra/db/RowIndexEntryTest.java | 7 +- .../cassandra/db/commitlog/CDCTestReplayer.java | 4 +- .../db/commitlog/CommitLogTestReplayer.java | 4 +- .../apache/cassandra/utils/btree/BTreeTest.java | 55 +++++- 39 files changed, 492 insertions(+), 473 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 96ce286..c1e5aeb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Minimize BTree iterator allocations (CASSANDRA-15389) * Add client request size server metrics (CASSANDRA-15704) * Add additional logging around FileUtils and compaction leftover cleanup (CASSANDRA-15705) * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh (CASSANDRA-15711) diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index 74ad264..e11f784 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -53,6 +53,7 @@ public class ColumnIndex public int columnIndexCount; private int[] indexOffsets; + private final SerializationHelper helper; private final SerializationHeader header; private final int version; private final SequentialWriter writer; @@ -79,6 +80,7 @@ public class ColumnIndex Collection<SSTableFlushObserver> observers, ISerializer<IndexInfo> indexInfoSerializer) { + this.helper = new SerializationHelper(header); this.header = header; this.writer = writer; this.version = version.correspondingMessagingVersion(); @@ -126,7 +128,7 @@ public class ColumnIndex { Row staticRow = iterator.staticRow(); - UnfilteredSerializer.serializer.serializeStaticRow(staticRow, header, writer, version); + UnfilteredSerializer.serializer.serializeStaticRow(staticRow, helper, writer, version); if (!observers.isEmpty()) observers.forEach((o) -> o.nextUnfilteredCluster(staticRow)); } @@ -248,7 +250,7 @@ public class ColumnIndex startPosition = pos; } - UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); + UnfilteredSerializer.serializer.serialize(unfiltered, helper, writer, pos - previousRowStart, version); // notify observers about each new row if (!observers.isEmpty()) diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index f56072e..fe13919 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -53,7 +53,7 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle public static final Serializer serializer = new Serializer(); public static final Columns NONE = new Columns(BTree.empty(), 0); - private static final ColumnMetadata FIRST_COMPLEX_STATIC = + public static final ColumnMetadata FIRST_COMPLEX_STATIC = new ColumnMetadata("", "", ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, UTF8Type.instance), @@ -61,7 +61,7 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC); - private static final ColumnMetadata FIRST_COMPLEX_REGULAR = + public static final ColumnMetadata FIRST_COMPLEX_REGULAR = new ColumnMetadata("", "", ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, UTF8Type.instance), @@ -382,11 +382,10 @@ public class Columns extends AbstractCollection<ColumnMetadata> implements Colle /** * Apply a function to each column definition in forwards or reversed order. * @param function - * @param reversed */ - public void apply(Consumer<ColumnMetadata> function, boolean reversed) + public void apply(Consumer<ColumnMetadata> function) { - BTree.apply(columns, function, reversed); + BTree.apply(columns, function); } @Override diff --git a/src/java/org/apache/cassandra/db/Mutation.java b/src/java/org/apache/cassandra/db/Mutation.java index 22c4ed8..3d27ef3 100644 --- a/src/java/org/apache/cassandra/db/Mutation.java +++ b/src/java/org/apache/cassandra/db/Mutation.java @@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -336,7 +336,7 @@ public class Mutation implements IMutation PartitionUpdate.serializer.serialize(entry.getValue(), out, version); } - public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + public Mutation deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException { int size = (int)in.readUnsignedVInt(); assert size > 0; @@ -359,7 +359,7 @@ public class Mutation implements IMutation public Mutation deserialize(DataInputPlus in, int version) throws IOException { - return deserialize(in, version, SerializationHelper.Flag.FROM_REMOTE); + return deserialize(in, version, DeserializationHelper.Flag.FROM_REMOTE); } public long serializedSize(Mutation mutation, int version) diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index affbbbe..3f6481d 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -182,7 +182,7 @@ public abstract class ReadResponse command.getRepairedDataDigest(), command.isRepairedDataDigestConclusive(), MessagingService.current_version, - SerializationHelper.Flag.LOCAL); + DeserializationHelper.Flag.LOCAL); } private static ByteBuffer build(UnfilteredPartitionIterator iter, ColumnFilter selection) @@ -208,7 +208,7 @@ public abstract class ReadResponse boolean isRepairedDigestConclusive, int version) { - super(data, repairedDataDigest, isRepairedDigestConclusive, version, SerializationHelper.Flag.FROM_REMOTE); + super(data, repairedDataDigest, isRepairedDigestConclusive, version, DeserializationHelper.Flag.FROM_REMOTE); } } @@ -220,13 +220,13 @@ public abstract class ReadResponse private final ByteBuffer repairedDataDigest; private final boolean isRepairedDigestConclusive; private final int dataSerializationVersion; - private final SerializationHelper.Flag flag; + private final DeserializationHelper.Flag flag; protected DataResponse(ByteBuffer data, ByteBuffer repairedDataDigest, boolean isRepairedDigestConclusive, int dataSerializationVersion, - SerializationHelper.Flag flag) + DeserializationHelper.Flag flag) { super(); this.data = data; diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 15ef268..1c22feb 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.ImmutableList; + import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.TypeParser; diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 84ff691..f9ff1d7 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -34,7 +34,7 @@ public class UnfilteredDeserializer { protected final TableMetadata metadata; protected final DataInputPlus in; - protected final SerializationHelper helper; + protected final DeserializationHelper helper; private final ClusteringPrefix.Deserializer clusteringDeserializer; private final SerializationHeader header; @@ -49,7 +49,7 @@ public class UnfilteredDeserializer private UnfilteredDeserializer(TableMetadata metadata, DataInputPlus in, SerializationHeader header, - SerializationHelper helper) + DeserializationHelper helper) { this.metadata = metadata; this.in = in; @@ -62,7 +62,7 @@ public class UnfilteredDeserializer public static UnfilteredDeserializer create(TableMetadata metadata, DataInputPlus in, SerializationHeader header, - SerializationHelper helper) + DeserializationHelper helper) { return new UnfilteredDeserializer(metadata, in, header, helper); } diff --git a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java index cfc7da2..c631f1c 100644 --- a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java @@ -43,7 +43,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator protected final DecoratedKey key; protected final DeletionTime partitionLevelDeletion; protected final ColumnFilter columns; - protected final SerializationHelper helper; + protected final DeserializationHelper helper; protected final Row staticRow; protected final Reader reader; @@ -70,7 +70,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator this.key = key; this.columns = columnFilter; this.slices = slices; - this.helper = new SerializationHelper(metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL, columnFilter); + this.helper = new DeserializationHelper(metadata, sstable.descriptor.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL, columnFilter); if (indexEntry == null) { @@ -159,7 +159,7 @@ public abstract class AbstractSSTableIterator implements UnfilteredRowIterator private static Row readStaticRow(SSTableReader sstable, FileDataInput file, - SerializationHelper helper, + DeserializationHelper helper, Columns statics) throws IOException { if (!sstable.header.hasStatic()) diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java index c91841f..5123580 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java @@ -34,7 +34,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason; import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.exceptions.UnknownTableException; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataInputBuffer; @@ -429,7 +429,7 @@ public class CommitLogReader { mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), - SerializationHelper.Flag.LOCAL); + DeserializationHelper.Flag.LOCAL); // doublecheck that what we read is still] valid for the current schema for (PartitionUpdate upd : mutation.getPartitionUpdates()) upd.validate(); diff --git a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java index 4d9c227..9a2b331 100644 --- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java @@ -177,11 +177,11 @@ public class CachedBTreePartition extends ImmutableBTreePartition implements Cac TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in)); - UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, SerializationHelper.Flag.LOCAL); + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, DeserializationHelper.Flag.LOCAL); assert !header.isReversed && header.rowEstimate >= 0; Holder holder; - try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, SerializationHelper.Flag.LOCAL, header)) + try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, DeserializationHelper.Flag.LOCAL, header)) { holder = ImmutableBTreePartition.build(partition, header.rowEstimate); } diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index ec01fa6..076c975 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -20,8 +20,6 @@ package org.apache.cassandra.db.partitions; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; import java.util.List; import com.google.common.collect.Iterables; @@ -38,7 +36,6 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; @@ -230,7 +227,7 @@ public class PartitionUpdate extends AbstractBTreePartition { return serializer.deserialize(new DataInputBuffer(bytes, true), version, - SerializationHelper.Flag.LOCAL); + DeserializationHelper.Flag.LOCAL); } catch (IOException e) { @@ -636,7 +633,7 @@ public class PartitionUpdate extends AbstractBTreePartition } } - public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + public PartitionUpdate deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException { TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in)); UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag); diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 945bcb4..30b4d9e 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -316,7 +316,7 @@ public abstract class UnfilteredPartitionIterators out.writeBoolean(false); } - public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final TableMetadata metadata, final ColumnFilter selection, final SerializationHelper.Flag flag) throws IOException + public UnfilteredPartitionIterator deserialize(final DataInputPlus in, final int version, final TableMetadata metadata, final ColumnFilter selection, final DeserializationHelper.Flag flag) throws IOException { // Skip now unused isForThrift boolean in.readBoolean(); diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index 957ffd4..fc90e34 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -24,10 +24,11 @@ import java.util.stream.StreamSupport; import com.google.common.collect.Iterables; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.serializers.MarshalException; /** @@ -66,8 +67,7 @@ public abstract class AbstractRow implements Row deletion().digest(digest); primaryKeyLivenessInfo().digest(digest); - for (ColumnData cd : this) - cd.digest(digest); + apply(ColumnData::digest, digest); } public void validateData(TableMetadata metadata) @@ -93,15 +93,7 @@ public abstract class AbstractRow implements Row if (deletion().time().localDeletionTime() < 0) throw new MarshalException("A local deletion time should not be negative in '" + metadata + "'"); - for (ColumnData cd : this) - try - { - cd.validate(); - } - catch (Exception e) - { - throw new MarshalException("data for '" + cd.column.debugString() + "', " + cd + " in '" + metadata + "' didn't validate", e); - } + apply(cd -> cd.validate()); } public boolean hasInvalidDeletions() diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index dc3219a..6689c77 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -19,13 +19,14 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.util.*; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; import com.google.common.base.Function; import com.google.common.collect.Collections2; -import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; @@ -53,6 +54,11 @@ public class BTreeRow extends AbstractRow // The data for each columns present in this row in column sorted order. private final Object[] btree; + private static final ColumnData FIRST_COMPLEX_STATIC = new ComplexColumnData(Columns.FIRST_COMPLEX_STATIC, new Object[0], new DeletionTime(0, 0)); + private static final ColumnData FIRST_COMPLEX_REGULAR = new ComplexColumnData(Columns.FIRST_COMPLEX_REGULAR, new Object[0], new DeletionTime(0, 0)); + private static final Comparator<ColumnData> COLUMN_COMPARATOR = (cd1, cd2) -> cd1.column.compareTo(cd2.column); + + // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones, @@ -90,8 +96,8 @@ public class BTreeRow extends AbstractRow int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time())); if (minDeletionTime != Integer.MIN_VALUE) { - for (ColumnData cd : BTree.<ColumnData>iterable(btree)) - minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); + long result = BTree.<ColumnData>accumulate(btree, (cd, l) -> Math.min(l, minDeletionTime(cd)) , minDeletionTime); + minDeletionTime = Ints.checkedCast(result); } return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); @@ -168,23 +174,49 @@ public class BTreeRow extends AbstractRow return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd); } - public void apply(Consumer<ColumnData> function, boolean reversed) + public void apply(Consumer<ColumnData> function) + { + BTree.apply(btree, function); + } + + public <A> void apply(BiConsumer<A, ColumnData> function, A arg) + { + BTree.apply(btree, function, arg); + } + + public long accumulate(LongAccumulator<ColumnData> accumulator, long initialValue) { - BTree.apply(btree, function, reversed); + return BTree.accumulate(btree, accumulator, initialValue); } - public void apply(Consumer<ColumnData> funtion, com.google.common.base.Predicate<ColumnData> stopCondition, boolean reversed) + public long accumulate(LongAccumulator<ColumnData> accumulator, Comparator<ColumnData> comparator, ColumnData from, long initialValue) { - BTree.apply(btree, funtion, stopCondition, reversed); + return BTree.accumulate(btree, accumulator, comparator, from, initialValue); + } + + public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg, long initialValue) + { + return BTree.accumulate(btree, accumulator, arg, initialValue); + } + + public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue) + { + return BTree.accumulate(btree, accumulator, arg, comparator, from, initialValue); } private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion) { - //we have to wrap this for the lambda - final WrappedInt min = new WrappedInt(Math.min(minDeletionTime(info), minDeletionTime(rowDeletion))); + long min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion)); + + min = BTree.<ColumnData>accumulate(btree, (cd, l) -> { + int m = Math.min((int) l, minDeletionTime(cd)); + return m != Integer.MIN_VALUE ? m : Long.MAX_VALUE; + }, min); - BTree.<ColumnData>apply(btree, cd -> min.set( Math.min(min.get(), minDeletionTime(cd)) ), cd -> min.get() == Integer.MIN_VALUE, false); - return min.get(); + if (min == Long.MAX_VALUE) + return Integer.MIN_VALUE; + + return Ints.checkedCast(min); } public Clustering clustering() @@ -334,33 +366,19 @@ public class BTreeRow extends AbstractRow public boolean hasComplex() { - // We start by the end cause we know complex columns sort after the simple ones - ColumnData cd = Iterables.getFirst(BTree.<ColumnData>iterable(btree, BTree.Dir.DESC), null); - return cd != null && cd.column.isComplex(); + if (BTree.isEmpty(btree)) + return false; + + int size = BTree.size(btree); + ColumnData last = BTree.findByIndex(btree, size - 1); + return last.column.isComplex(); } public boolean hasComplexDeletion() { - final WrappedBoolean result = new WrappedBoolean(false); - - // We start by the end cause we know complex columns sort before simple ones - apply(c -> {}, cd -> { - if (cd.column.isSimple()) - { - result.set(false); - return true; - } - - if (!((ComplexColumnData) cd).complexDeletion().isLive()) - { - result.set(true); - return true; - } - - return false; - }, true); - - return result.get(); + long result = accumulate((cd, v) -> ((ComplexColumnData) cd).complexDeletion().isLive() ? 0 : Long.MAX_VALUE, + COLUMN_COMPARATOR, isStatic() ? FIRST_COMPLEX_STATIC : FIRST_COMPLEX_REGULAR, 0L); + return result == Long.MAX_VALUE; } public Row markCounterLocalToBeCleared() @@ -375,6 +393,15 @@ public class BTreeRow extends AbstractRow return nowInSec >= minLocalDeletionTime; } + public boolean hasInvalidDeletions() + { + if (primaryKeyLivenessInfo().isExpiring() && (primaryKeyLivenessInfo().ttl() < 0 || primaryKeyLivenessInfo().localExpirationTime() < 0)) + return true; + if (!deletion().time().validate()) + return true; + return accumulate((cd, v) -> cd.hasInvalidDeletions() ? Long.MAX_VALUE : v, 0) != 0; + } + /** * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and * all deletion timestamp by {@code newTimestamp - 1}. @@ -440,9 +467,7 @@ public class BTreeRow extends AbstractRow + primaryKeyLivenessInfo.dataSize() + deletion.dataSize(); - for (ColumnData cd : this) - dataSize += cd.dataSize(); - return dataSize; + return Ints.checkedCast(accumulate((cd, v) -> v + cd.dataSize(), dataSize)); } public long unsharedHeapSizeExcludingData() @@ -451,9 +476,7 @@ public class BTreeRow extends AbstractRow + clustering.unsharedHeapSizeExcludingData() + BTree.sizeOfStructureOnHeap(btree); - for (ColumnData cd : this) - heapSize += cd.unsharedHeapSizeExcludingData(); - return heapSize; + return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), heapSize); } public static Row.Builder sortedBuilder() diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index 300bbce..959676a 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -214,7 +214,7 @@ public abstract class Cell extends ColumnData header.getType(column).writeValue(cell.value(), out); } - public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnMetadata column, SerializationHeader header, SerializationHelper helper) throws IOException + public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnMetadata column, SerializationHeader header, DeserializationHelper helper) throws IOException { int flags = in.readUnsignedByte(); boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java b/src/java/org/apache/cassandra/db/rows/ColumnData.java index e5f5550..36aad97 100644 --- a/src/java/org/apache/cassandra/db/rows/ColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java @@ -78,6 +78,11 @@ public abstract class ColumnData */ public abstract void digest(Digest digest); + public static void digest(Digest digest, ColumnData cd) + { + cd.digest(digest); + } + /** * Returns a copy of the data where all timestamps for live data have replaced by {@code newTimestamp} and * all deletion timestamp by {@code newTimestamp - 1}. diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index 832167f..5b03504 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -30,8 +30,11 @@ import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.marshal.ByteType; import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.DroppedColumn; +import org.apache.cassandra.utils.BiLongAccumulator; +import org.apache.cassandra.utils.LongAccumulator; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.btree.BTree; @@ -60,11 +63,6 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> this.complexDeletion = complexDeletion; } - public boolean hasCells() - { - return !BTree.isEmpty(cells); - } - public int cellsCount() { return BTree.size(cells); @@ -106,6 +104,16 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> return BTree.iterator(cells, BTree.Dir.DESC); } + public long accumulate(LongAccumulator<Cell> accumulator, long initialValue) + { + return BTree.accumulate(cells, accumulator, initialValue); + } + + public <A> long accumulate(BiLongAccumulator<A, Cell> accumulator, A arg, long initialValue) + { + return BTree.accumulate(cells, accumulator, arg, initialValue); + } + public int dataSize() { int size = complexDeletion.dataSize(); diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java similarity index 95% copy from src/java/org/apache/cassandra/db/rows/SerializationHelper.java copy to src/java/org/apache/cassandra/db/rows/DeserializationHelper.java index db23cb8..386e6ef 100644 --- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java +++ b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java @@ -27,7 +27,7 @@ import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.schema.DroppedColumn; -public class SerializationHelper +public class DeserializationHelper { /** * Flag affecting deserialization behavior (this only affect counters in practice). @@ -56,7 +56,7 @@ public class SerializationHelper private DroppedColumn currentDroppedComplex; - public SerializationHelper(TableMetadata metadata, int version, Flag flag, ColumnFilter columnsToFetch) + public DeserializationHelper(TableMetadata metadata, int version, Flag flag, ColumnFilter columnsToFetch) { this.flag = flag; this.version = version; @@ -65,7 +65,7 @@ public class SerializationHelper this.hasDroppedColumns = droppedColumns.size() > 0; } - public SerializationHelper(TableMetadata metadata, int version, Flag flag) + public DeserializationHelper(TableMetadata metadata, int version, Flag flag) { this(metadata, version, flag, null); } diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 2f752b8..ee93da4 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -18,15 +18,16 @@ package org.apache.cassandra.db.rows; import java.util.*; +import java.util.function.BiConsumer; import java.util.function.Consumer; -import com.google.common.base.Predicate; - import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.utils.BiLongAccumulator; +import org.apache.cassandra.utils.LongAccumulator; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; @@ -282,12 +283,24 @@ public interface Row extends Unfiltered, Iterable<ColumnData> /** * Apply a function to every column in a row */ - public void apply(Consumer<ColumnData> function, boolean reverse); + public void apply(Consumer<ColumnData> function); + + /** + * Apply a function to every column in a row + */ + public <A> void apply(BiConsumer<A, ColumnData> function, A arg); /** - * Apply a funtion to every column in a row until a stop condition is reached + * Apply an accumulation funtion to every column in a row */ - public void apply(Consumer<ColumnData> function, Predicate<ColumnData> stopCondition, boolean reverse); + + public long accumulate(LongAccumulator<ColumnData> accumulator, long initialValue); + + public long accumulate(LongAccumulator<ColumnData> accumulator, Comparator<ColumnData> comparator, ColumnData from, long initialValue); + + public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg, long initialValue); + + public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue); /** * A row deletion/tombstone. diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index d62d3b5..58284ac 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -27,7 +27,6 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.utils.MergeIterator; -import org.apache.cassandra.utils.WrappedInt; /** * Static utilities to work on Row objects. @@ -75,6 +74,46 @@ public abstract class Rows return new SimpleBuilders.RowBuilder(metadata, clusteringValues); } + private static class StatsAccumulation + { + private static final long COLUMN_INCR = 1L << 32; + private static final long CELL_INCR = 1L; + + private static long accumulateOnCell(PartitionStatisticsCollector collector, Cell cell, long l) + { + Cells.collectStats(cell, collector); + return l + CELL_INCR; + } + + private static long accumulateOnColumnData(PartitionStatisticsCollector collector, ColumnData cd, long l) + { + if (cd.column().isSimple()) + { + l = accumulateOnCell(collector, (Cell) cd, l) + COLUMN_INCR; + } + else + { + ComplexColumnData complexData = (ComplexColumnData)cd; + collector.update(complexData.complexDeletion()); + int startingCells = unpackCellCount(l); + l = complexData.accumulate(StatsAccumulation::accumulateOnCell, collector, l); + if (unpackCellCount(l) > startingCells) + l += COLUMN_INCR; + } + return l; + } + + private static int unpackCellCount(long v) + { + return (int) (v & 0xFFFFFFFFL); + } + + private static int unpackColumnCount(long v) + { + return (int) (v >>> 32); + } + } + /** * Collect statistics on a given row. * @@ -89,35 +128,10 @@ public abstract class Rows collector.update(row.primaryKeyLivenessInfo()); collector.update(row.deletion().time()); - //we have to wrap these for the lambda - final WrappedInt columnCount = new WrappedInt(0); - final WrappedInt cellCount = new WrappedInt(0); - - row.apply(cd -> { - if (cd.column().isSimple()) - { - columnCount.increment(); - cellCount.increment(); - Cells.collectStats((Cell) cd, collector); - } - else - { - ComplexColumnData complexData = (ComplexColumnData)cd; - collector.update(complexData.complexDeletion()); - if (complexData.hasCells()) - { - columnCount.increment(); - for (Cell cell : complexData) - { - cellCount.increment(); - Cells.collectStats(cell, collector); - } - } - } - }, false); + long result = row.accumulate(StatsAccumulation::accumulateOnColumnData, collector, 0); - collector.updateColumnSetPerRow(columnCount.get()); - return cellCount.get(); + collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result)); + return StatsAccumulation.unpackCellCount(result); } /** diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java index db23cb8..dca4240 100644 --- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java +++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java @@ -15,135 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.cassandra.db.rows; -import java.nio.ByteBuffer; -import java.util.*; +package org.apache.cassandra.db.rows; +import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.context.CounterContext; -import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.schema.DroppedColumn; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; public class SerializationHelper { - /** - * Flag affecting deserialization behavior (this only affect counters in practice). - * - LOCAL: for deserialization of local data (Expired columns are - * converted to tombstones (to gain disk space)). - * - FROM_REMOTE: for deserialization of data received from remote hosts - * (Expired columns are converted to tombstone and counters have - * their delta cleared) - * - PRESERVE_SIZE: used when no transformation must be performed, i.e, - * when we must ensure that deserializing and reserializing the - * result yield the exact same bytes. Streaming uses this. - */ - public enum Flag - { - LOCAL, FROM_REMOTE, PRESERVE_SIZE - } - - private final Flag flag; - public final int version; - - private final ColumnFilter columnsToFetch; - private ColumnFilter.Tester tester; - - private final boolean hasDroppedColumns; - private final Map<ByteBuffer, DroppedColumn> droppedColumns; - private DroppedColumn currentDroppedComplex; - - - public SerializationHelper(TableMetadata metadata, int version, Flag flag, ColumnFilter columnsToFetch) - { - this.flag = flag; - this.version = version; - this.columnsToFetch = columnsToFetch; - this.droppedColumns = metadata.droppedColumns; - this.hasDroppedColumns = droppedColumns.size() > 0; - } - - public SerializationHelper(TableMetadata metadata, int version, Flag flag) - { - this(metadata, version, flag, null); - } + public final SerializationHeader header; + private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null; + private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars = null; - public boolean includes(ColumnMetadata column) + public SerializationHelper(SerializationHeader header) { - return columnsToFetch == null || columnsToFetch.fetches(column); + this.header = header; } - public boolean includes(Cell cell, LivenessInfo rowLiveness) + private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics() { - if (columnsToFetch == null) - return true; - - // During queries, some columns are included even though they are not queried by the user because - // we always need to distinguish between having a row (with potentially only null values) and not - // having a row at all (see #CASSANDRA-7085 for background). In the case where the column is not - // actually requested by the user however (canSkipValue), we can skip the full cell if the cell - // timestamp is lower than the row one, because in that case, the row timestamp is enough proof - // of the liveness of the row. Otherwise, we'll only be able to skip the values of those cells. - ColumnMetadata column = cell.column(); - if (column.isComplex()) - { - if (!includes(cell.path())) - return false; - - return !canSkipValue(cell.path()) || cell.timestamp() >= rowLiveness.timestamp(); - } - else - { - return columnsToFetch.fetchedColumnIsQueried(column) || cell.timestamp() >= rowLiveness.timestamp(); - } - } - - public boolean includes(CellPath path) - { - return path == null || tester == null || tester.fetches(path); - } - - public boolean canSkipValue(ColumnMetadata column) - { - return columnsToFetch != null && !columnsToFetch.fetchedColumnIsQueried(column); - } - - public boolean canSkipValue(CellPath path) - { - return path != null && tester != null && !tester.fetchedCellIsQueried(path); - } - - public void startOfComplexColumn(ColumnMetadata column) - { - this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column); - this.currentDroppedComplex = droppedColumns.get(column.name.bytes); - } - - public void endOfComplexColumn() - { - this.tester = null; - } - - public boolean isDropped(Cell cell, boolean isComplex) - { - if (!hasDroppedColumns) - return false; - - DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes); - return dropped != null && cell.timestamp() <= dropped.droppedTime; + if (statics == null) + statics = header.columns().statics.iterator(); + return statics; } - public boolean isDroppedComplexDeletion(DeletionTime complexDeletion) + private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars() { - return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime; + if (regulars == null) + regulars = header.columns().regulars.iterator(); + return regulars; } - public ByteBuffer maybeClearCounterValue(ByteBuffer value) + public SearchIterator<ColumnMetadata, ColumnMetadata> iterator(boolean isStatic) { - return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)) - ? CounterContext.instance().clearAllLocal(value) - : value; + BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator = isStatic ? statics() : regulars(); + iterator.rewind(); + return iterator; } } diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 7cac5e6..df67754 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -123,18 +123,19 @@ public class UnfilteredRowIteratorSerializer out.writeByte((byte)flags); SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic); + SerializationHelper helper = new SerializationHelper(header); if (!partitionDeletion.isLive()) header.writeDeletionTime(partitionDeletion, out); if (hasStatic) - UnfilteredSerializer.serializer.serialize(staticRow, header, out, version); + UnfilteredSerializer.serializer.serialize(staticRow, helper, out, version); if (rowEstimate >= 0) out.writeUnsignedVInt(rowEstimate); while (iterator.hasNext()) - UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version); + UnfilteredSerializer.serializer.serialize(iterator.next(), helper, out, version); UnfilteredSerializer.serializer.writeEndOfPartition(out); } @@ -147,6 +148,8 @@ public class UnfilteredRowIteratorSerializer iterator.columns(), iterator.stats()); + SerializationHelper helper = new SerializationHelper(header); + assert rowEstimate >= 0; long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey()) @@ -165,19 +168,19 @@ public class UnfilteredRowIteratorSerializer size += header.deletionTimeSerializedSize(partitionDeletion); if (hasStatic) - size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version); + size += UnfilteredSerializer.serializer.serializedSize(staticRow, helper, version); if (rowEstimate >= 0) size += TypeSizes.sizeofUnsignedVInt(rowEstimate); while (iterator.hasNext()) - size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version); + size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), helper, version); size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(); return size; } - public Header deserializeHeader(TableMetadata metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + public Header deserializeHeader(TableMetadata metadata, ColumnFilter selection, DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException { DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); @@ -198,18 +201,18 @@ public class UnfilteredRowIteratorSerializer Row staticRow = Rows.EMPTY_STATIC_ROW; if (hasStatic) - staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag)); + staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new DeserializationHelper(metadata, version, flag)); int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1; return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate); } - public UnfilteredRowIterator deserialize(DataInputPlus in, int version, TableMetadata metadata, SerializationHelper.Flag flag, Header header) throws IOException + public UnfilteredRowIterator deserialize(DataInputPlus in, int version, TableMetadata metadata, DeserializationHelper.Flag flag, Header header) throws IOException { if (header.isEmpty) return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed); - final SerializationHelper helper = new SerializationHelper(metadata, version, flag); + final DeserializationHelper helper = new DeserializationHelper(metadata, version, flag); final SerializationHeader sHeader = header.sHeader; return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats()) { @@ -230,7 +233,7 @@ public class UnfilteredRowIteratorSerializer }; } - public UnfilteredRowIterator deserialize(DataInputPlus in, int version, TableMetadata metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException + public UnfilteredRowIterator deserialize(DataInputPlus in, int version, TableMetadata metadata, ColumnFilter selection, DeserializationHelper.Flag flag) throws IOException { return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag)); } diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 7b48652..a5fad14 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -19,8 +19,6 @@ package org.apache.cassandra.db.rows; import java.io.IOException; -import com.google.common.collect.Collections2; - import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.db.*; @@ -118,40 +116,41 @@ public class UnfilteredSerializer @Deprecated private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the row deletion is shadowable. If there is no extended flag (or no row deletion), the deletion is assumed not shadowable. - public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version) + public void serialize(Unfiltered unfiltered, SerializationHelper helper, DataOutputPlus out, int version) throws IOException { - assert !header.isForSSTable(); - serialize(unfiltered, header, out, 0, version); + assert !helper.header.isForSSTable(); + serialize(unfiltered, helper, out, 0, version); } - public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) + public void serialize(Unfiltered unfiltered, SerializationHelper helper, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - serialize((RangeTombstoneMarker) unfiltered, header, out, previousUnfilteredSize, version); + serialize((RangeTombstoneMarker) unfiltered, helper, out, previousUnfilteredSize, version); } else { - serialize((Row) unfiltered, header, out, previousUnfilteredSize, version); + serialize((Row) unfiltered, helper, out, previousUnfilteredSize, version); } } - public void serializeStaticRow(Row row, SerializationHeader header, DataOutputPlus out, int version) + public void serializeStaticRow(Row row, SerializationHelper helper, DataOutputPlus out, int version) throws IOException { assert row.isStatic(); - serialize(row, header, out, 0, version); + serialize(row, helper, out, 0, version); } - private void serialize(Row row, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) + private void serialize(Row row, SerializationHelper helper, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { int flags = 0; int extendedFlags = 0; boolean isStatic = row.isStatic(); + SerializationHeader header = helper.header; Columns headerColumns = header.columns(isStatic); LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); @@ -191,7 +190,7 @@ public class UnfilteredSerializer { try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get()) { - serializeRowBody(row, flags, header, dob); + serializeRowBody(row, flags, helper, dob); out.writeUnsignedVInt(dob.position() + TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize)); // We write the size of the previous unfiltered to make reverse queries more efficient (and simpler). @@ -202,16 +201,17 @@ public class UnfilteredSerializer } else { - serializeRowBody(row, flags, header, out); + serializeRowBody(row, flags, helper, out); } } @Inline - private void serializeRowBody(Row row, int flags, SerializationHeader header, DataOutputPlus out) + private void serializeRowBody(Row row, int flags, SerializationHelper helper, DataOutputPlus out) throws IOException { boolean isStatic = row.isStatic(); + SerializationHeader header = helper.header; Columns headerColumns = header.columns(isStatic); LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); @@ -229,7 +229,7 @@ public class UnfilteredSerializer if ((flags & HAS_ALL_COLUMNS) == 0) Columns.serializer.serializeSubset(row.columns(), headerColumns, out); - SearchIterator<ColumnMetadata, ColumnMetadata> si = headerColumns.iterator(); + SearchIterator<ColumnMetadata, ColumnMetadata> si = helper.iterator(isStatic); try { @@ -253,7 +253,7 @@ public class UnfilteredSerializer { throw new WrappedException(e); } - }, false); + }); } catch (WrappedException e) { @@ -275,9 +275,10 @@ public class UnfilteredSerializer Cell.serializer.serialize(cell, column, out, rowLiveness, header); } - private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) + private void serialize(RangeTombstoneMarker marker, SerializationHelper helper, DataOutputPlus out, long previousUnfilteredSize, int version) throws IOException { + SerializationHeader header = helper.header; out.writeByte((byte)IS_MARKER); ClusteringBoundOrBoundary.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes()); @@ -299,20 +300,20 @@ public class UnfilteredSerializer } } - public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version) + public long serializedSize(Unfiltered unfiltered, SerializationHelper helper, int version) { - assert !header.isForSSTable(); - return serializedSize(unfiltered, header, 0, version); + assert !helper.header.isForSSTable(); + return serializedSize(unfiltered, helper, 0, version); } - public long serializedSize(Unfiltered unfiltered, SerializationHeader header, long previousUnfilteredSize,int version) + public long serializedSize(Unfiltered unfiltered, SerializationHelper helper, long previousUnfilteredSize,int version) { return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER - ? serializedSize((RangeTombstoneMarker) unfiltered, header, previousUnfilteredSize, version) - : serializedSize((Row) unfiltered, header, previousUnfilteredSize, version); + ? serializedSize((RangeTombstoneMarker) unfiltered, helper, previousUnfilteredSize, version) + : serializedSize((Row) unfiltered, helper, previousUnfilteredSize, version); } - private long serializedSize(Row row, SerializationHeader header, long previousUnfilteredSize, int version) + private long serializedSize(Row row, SerializationHelper helper, long previousUnfilteredSize, int version) { long size = 1; // flags @@ -320,15 +321,16 @@ public class UnfilteredSerializer size += 1; // extended flags if (!row.isStatic()) - size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes()); + size += Clustering.serializer.serializedSize(row.clustering(), version, helper.header.clusteringTypes()); - return size + serializedRowBodySize(row, header, previousUnfilteredSize, version); + return size + serializedRowBodySize(row, helper, previousUnfilteredSize, version); } - private long serializedRowBodySize(Row row, SerializationHeader header, long previousUnfilteredSize, int version) + private long serializedRowBodySize(Row row, SerializationHelper helper, long previousUnfilteredSize, int version) { long size = 0; + SerializationHeader header = helper.header; if (header.isForSSTable()) size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize); @@ -352,19 +354,16 @@ public class UnfilteredSerializer if (!hasAllColumns) size += Columns.serializer.serializedSubsetSize(row.columns(), header.columns(isStatic)); - SearchIterator<ColumnMetadata, ColumnMetadata> si = headerColumns.iterator(); - for (ColumnData data : row) - { + SearchIterator<ColumnMetadata, ColumnMetadata> si = helper.iterator(isStatic); + return row.accumulate((data, v) -> { ColumnMetadata column = si.next(data.column()); assert column != null; if (data.column.isSimple()) - size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header); + return v + Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header); else - size += sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header); - } - - return size; + return v + sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header); + }, size); } private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header) @@ -381,12 +380,12 @@ public class UnfilteredSerializer return size; } - private long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version) + private long serializedSize(RangeTombstoneMarker marker, SerializationHelper helper, long previousUnfilteredSize, int version) { - assert !header.isForSSTable(); + assert !helper.header.isForSSTable(); return 1 // flags - + ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes()) - + serializedMarkerBodySize(marker, header, previousUnfilteredSize, version); + + ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(), version, helper.header.clusteringTypes()) + + serializedMarkerBodySize(marker, helper.header, previousUnfilteredSize, version); } private long serializedMarkerBodySize(RangeTombstoneMarker marker, SerializationHeader header, long previousUnfilteredSize, int version) @@ -428,7 +427,7 @@ public class UnfilteredSerializer * @return the deserialized {@link Unfiltered} or {@code null} if we've read the end of a partition. This method is * guaranteed to never return empty rows. */ - public Unfiltered deserialize(DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder) + public Unfiltered deserialize(DataInputPlus in, SerializationHeader header, DeserializationHelper helper, Row.Builder builder) throws IOException { while (true) @@ -453,7 +452,7 @@ public class UnfilteredSerializer * But as {@link UnfilteredRowIterator} should not return empty * rows, this mean consumer of this method should make sure to skip said empty rows. */ - private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder) + private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader header, DeserializationHelper helper, Row.Builder builder) throws IOException { // It wouldn't be wrong per-se to use an unsorted builder, but it would be inefficient so make sure we don't do it by mistake @@ -481,7 +480,7 @@ public class UnfilteredSerializer } } - public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeader header, SerializationHelper helper) + public Unfiltered deserializeTombstonesOnly(FileDataInput in, SerializationHeader header, DeserializationHelper helper) throws IOException { while (true) @@ -533,7 +532,7 @@ public class UnfilteredSerializer } } - public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) + public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, DeserializationHelper helper) throws IOException { int flags = in.readUnsignedByte(); @@ -561,7 +560,7 @@ public class UnfilteredSerializer public Row deserializeRowBody(DataInputPlus in, SerializationHeader header, - SerializationHelper helper, + DeserializationHelper helper, int flags, int extendedFlags, Row.Builder builder) @@ -614,7 +613,7 @@ public class UnfilteredSerializer { throw new WrappedException(e); } - }, false); + }); } catch (WrappedException e) { @@ -636,7 +635,7 @@ public class UnfilteredSerializer } } - private void readSimpleColumn(ColumnMetadata column, DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder, LivenessInfo rowLiveness) + private void readSimpleColumn(ColumnMetadata column, DataInputPlus in, SerializationHeader header, DeserializationHelper helper, Row.Builder builder, LivenessInfo rowLiveness) throws IOException { if (helper.includes(column)) @@ -651,7 +650,7 @@ public class UnfilteredSerializer } } - private void readComplexColumn(ColumnMetadata column, DataInputPlus in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness) + private void readComplexColumn(ColumnMetadata column, DataInputPlus in, SerializationHeader header, DeserializationHelper helper, boolean hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness) throws IOException { if (helper.includes(column)) @@ -686,7 +685,7 @@ public class UnfilteredSerializer in.skipBytesFully(rowSize); } - public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException + public void skipStaticRow(DataInputPlus in, SerializationHeader header, DeserializationHelper helper) throws IOException { int flags = in.readUnsignedByte(); assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isExtended(flags) : "Flags is " + flags; diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 190f136..686d874 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -181,7 +181,7 @@ public class CassandraStreamReader implements IStreamReader private final TableMetadata metadata; private final DataInputPlus in; private final SerializationHeader header; - private final SerializationHelper helper; + private final DeserializationHelper helper; private DecoratedKey key; private DeletionTime partitionLevelDeletion; @@ -193,7 +193,7 @@ public class CassandraStreamReader implements IStreamReader { this.metadata = metadata; this.in = in; - this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.helper = new DeserializationHelper(metadata, version.correspondingMessagingVersion(), DeserializationHelper.Flag.PRESERVE_SIZE); this.header = header; } diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 1846aa5..76e12c8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -57,7 +57,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(file); if (!partitionLevelDeletion.validate()) UnfilteredValidation.handleInvalid(sstable.metadata(), key, sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString()); - SerializationHelper helper = new SerializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); + DeserializationHelper helper = new DeserializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL); SSTableSimpleIterator iterator = SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, partitionLevelDeletion); return new SSTableIdentityIterator(sstable, key, partitionLevelDeletion, file.getPath(), iterator); } @@ -76,7 +76,7 @@ public class SSTableIdentityIterator implements Comparable<SSTableIdentityIterat dfile.seek(indexEntry.position); ByteBufferUtil.skipShortLength(dfile); // Skip partition key DeletionTime partitionLevelDeletion = DeletionTime.serializer.deserialize(dfile); - SerializationHelper helper = new SerializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); + DeserializationHelper helper = new DeserializationHelper(sstable.metadata(), sstable.descriptor.version.correspondingMessagingVersion(), DeserializationHelper.Flag.LOCAL); SSTableSimpleIterator iterator = tombstoneOnly ? SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile, sstable.header, helper, partitionLevelDeletion) : SSTableSimpleIterator.create(sstable.metadata(), dfile, sstable.header, helper, partitionLevelDeletion); diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java index c3c7472..fd1b6a0 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java @@ -38,21 +38,21 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> { final TableMetadata metadata; protected final DataInputPlus in; - protected final SerializationHelper helper; + protected final DeserializationHelper helper; - private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in, SerializationHelper helper) + private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in, DeserializationHelper helper) { this.metadata = metadata; this.in = in; this.helper = helper; } - public static SSTableSimpleIterator create(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion) + public static SSTableSimpleIterator create(TableMetadata metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper helper, DeletionTime partitionDeletion) { return new CurrentFormatIterator(metadata, in, header, helper); } - public static SSTableSimpleIterator createTombstoneOnly(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper, DeletionTime partitionDeletion) + public static SSTableSimpleIterator createTombstoneOnly(TableMetadata metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper helper, DeletionTime partitionDeletion) { return new CurrentFormatTombstoneIterator(metadata, in, header, helper); } @@ -65,7 +65,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> private final Row.Builder builder; - private CurrentFormatIterator(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) + private CurrentFormatIterator(TableMetadata metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper helper) { super(metadata, in, helper); this.header = header; @@ -95,7 +95,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> { private final SerializationHeader header; - private CurrentFormatTombstoneIterator(TableMetadata metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) + private CurrentFormatTombstoneIterator(TableMetadata metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper helper) { super(metadata, in, helper); this.header = header; diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 369be12..7ac2ebc 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -31,6 +31,7 @@ import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.rows.UnfilteredSerializer; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.TableMetadataRef; @@ -56,6 +57,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter // Used to compute the row serialized size private final SerializationHeader header; + private final SerializationHelper helper; private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>(); private final DiskWriter diskWriter = new DiskWriter(); @@ -65,6 +67,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter super(directory, metadata, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; this.header = new SerializationHeader(true, metadata.get(), columns, EncodingStats.NO_STATS); + this.helper = new SerializationHelper(this.header); diskWriter.start(); } @@ -90,7 +93,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter // improve that. In particular, what we count is closer to the serialized value, but it's debatable that it's the right thing // to count since it will take a lot more space in memory and the bufferSize if first and foremost used to avoid OOM when // using this writer. - currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion()); + currentSize += UnfilteredSerializer.serializer.serializedSize(row, helper, 0, formatType.info.getLatestVersion().correspondingMessagingVersion()); } private void maybeSync() throws SyncException diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 422eaa8..ed0eb9b 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -117,7 +117,7 @@ public class Commit public Commit deserialize(DataInputPlus in, int version) throws IOException { UUID ballot = UUIDSerializer.serializer.deserialize(in, version); - PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, SerializationHelper.Flag.LOCAL); + PartitionUpdate update = PartitionUpdate.serializer.deserialize(in, version, DeserializationHelper.Flag.LOCAL); return new Commit(ballot, update); } diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java similarity index 67% copy from src/java/org/apache/cassandra/utils/WrappedInt.java copy to src/java/org/apache/cassandra/utils/BiLongAccumulator.java index a106575..2c3d6b5 100644 --- a/src/java/org/apache/cassandra/utils/WrappedInt.java +++ b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java @@ -18,35 +18,7 @@ package org.apache.cassandra.utils; -/** - * Simple wrapper for native int type - */ -public class WrappedInt +public interface BiLongAccumulator<T, A> { - private int value; - - public WrappedInt(int initial) - { - this.value = initial; - } - - public int get() - { - return value; - } - - public void set(int value) - { - this.value = value; - } - - public void increment() - { - ++value; - } - - public void decrement() - { - --value; - } + long apply(T obj, A arguemnt, long v); } diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java b/src/java/org/apache/cassandra/utils/LongAccumulator.java similarity index 67% rename from src/java/org/apache/cassandra/utils/WrappedInt.java rename to src/java/org/apache/cassandra/utils/LongAccumulator.java index a106575..fe3c195 100644 --- a/src/java/org/apache/cassandra/utils/WrappedInt.java +++ b/src/java/org/apache/cassandra/utils/LongAccumulator.java @@ -18,35 +18,7 @@ package org.apache.cassandra.utils; -/** - * Simple wrapper for native int type - */ -public class WrappedInt +public interface LongAccumulator<T> { - private int value; - - public WrappedInt(int initial) - { - this.value = initial; - } - - public int get() - { - return value; - } - - public void set(int value) - { - this.value = value; - } - - public void increment() - { - ++value; - } - - public void decrement() - { - --value; - } + long apply(T obj, long v); } diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 97e935e..6c546ac 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -19,14 +19,18 @@ package org.apache.cassandra.utils.btree; import java.util.*; +import java.util.function.BiConsumer; import java.util.function.Consumer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import com.google.common.collect.Ordering; +import org.apache.cassandra.utils.BiLongAccumulator; +import org.apache.cassandra.utils.LongAccumulator; import org.apache.cassandra.utils.ObjectSizes; import static com.google.common.collect.Iterables.concat; @@ -691,7 +695,7 @@ public class BTree } // returns true if the provided node is a leaf, false if it is a branch - static boolean isLeaf(Object[] node) + public static boolean isLeaf(Object[] node) { return (node.length & 1) == 1; } @@ -1285,36 +1289,46 @@ public class BTree return compare(cmp, previous, max) < 0; } - /** - * Simple method to walk the btree forwards or reversed and apply a function to each element - * - * Public method - * - */ - public static <V> void apply(Object[] btree, Consumer<V> function, boolean reversed) + private static <V, A> void applyValue(V value, BiConsumer<A, V> function, A argument) { - if (reversed) - applyReverse(btree, function, null); - else - applyForwards(btree, function, null); + function.accept(argument, value); + } + + public static <V, A> void applyLeaf(Object[] btree, BiConsumer<A, V> function, A argument) + { + Preconditions.checkArgument(isLeaf(btree)); + int limit = getLeafKeyEnd(btree); + for (int i=0; i<limit; i++) + applyValue((V) btree[i], function, argument); } /** - * Simple method to walk the btree forwards or reversed and apply a function till a stop condition is reached + * Simple method to walk the btree forwards and apply a function till a stop condition is reached * - * Public method + * Private method * + * @param btree + * @param function */ - public static <V> void apply(Object[] btree, Consumer<V> function, Predicate<V> stopCondition, boolean reversed) + public static <V, A> void apply(Object[] btree, BiConsumer<A, V> function, A argument) { - if (reversed) - applyReverse(btree, function, stopCondition); - else - applyForwards(btree, function, stopCondition); - } + if (isLeaf(btree)) + { + applyLeaf(btree, function, argument); + return; + } + int childOffset = getChildStart(btree); + int limit = btree.length - 1 - childOffset; + for (int i = 0 ; i < limit ; i++) + { + apply((Object[]) btree[childOffset + i], function, argument); + if (i < childOffset) + applyValue((V) btree[i], function, argument); + } + } /** * Simple method to walk the btree forwards and apply a function till a stop condition is reached @@ -1323,89 +1337,115 @@ public class BTree * * @param btree * @param function - * @param stopCondition */ - private static <V> boolean applyForwards(Object[] btree, Consumer<V> function, Predicate<V> stopCondition) + public static <V> void apply(Object[] btree, Consumer<V> function) { - boolean isLeaf = isLeaf(btree); - int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree); - int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1; - for (int i = 0 ; i < limit ; i++) - { - // we want to visit in iteration order, so we visit our key nodes inbetween our children - int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0); - Object current = btree[idx]; - if (idx < childOffset) - { - V castedCurrent = (V) current; - if (stopCondition != null && stopCondition.apply(castedCurrent)) - return true; + BTree.<V, Consumer<V>>apply(btree, Consumer::accept, function); + } - function.accept(castedCurrent); - } - else - { - if (applyForwards((Object[]) current, function, stopCondition)) - return true; - } + private static <V> int find(Object[] btree, V from, Comparator<V> comparator) + { + // find the start index in iteration order + Preconditions.checkNotNull(comparator); + int keyEnd = getKeyEnd(btree); + return Arrays.binarySearch((V[]) btree, 0, keyEnd, from, comparator); + } + + private static boolean isStopSentinel(long v) + { + return v == Long.MAX_VALUE; + } + + private static <V, A> long accumulateLeaf(Object[] btree, BiLongAccumulator<A, V> accumulator, A arg, Comparator<V> comparator, V from, long initialValue) + { + Preconditions.checkArgument(isLeaf(btree)); + long value = initialValue; + int limit = getLeafKeyEnd(btree); + + int startIdx = 0; + if (from != null) + { + int i = find(btree, from, comparator); + boolean isExact = i >= 0; + startIdx = isExact ? i : (-1 - i); } - return false; + for (int i = startIdx; i < limit; i++) + { + value = accumulator.apply(arg, (V) btree[i], value); + + if (isStopSentinel(value)) + break; + } + return value; } /** - * Simple method to walk the btree in reverse and apply a function till a stop condition is reached + * Walk the btree and accumulate a long value using the supplied accumulator function. Iteration will stop if the + * accumulator function returns the sentinel values Long.MIN_VALUE or Long.MAX_VALUE * - * Private method - * - * @param btree - * @param function - * @param stopCondition + * If the optional from argument is not null, iteration will start from that value (or the one after it's insertion + * point if an exact match isn't found) */ - private static <V> boolean applyReverse(Object[] btree, Consumer<V> function, Predicate<V> stopCondition) + public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A, V> accumulator, A arg, Comparator<V> comparator, V from, long initialValue) { - boolean isLeaf = isLeaf(btree); - int childOffset = isLeaf ? 0 : getChildStart(btree); - int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1; - for (int i = limit - 1, visited = 0; i >= 0 ; i--, visited++) + if (isLeaf(btree)) + return accumulateLeaf(btree, accumulator, arg, comparator, from, initialValue); + + long value = initialValue; + int childOffset = getChildStart(btree); + + int startChild = 0; + if (from != null) { - int idx = i; + int i = find(btree, from, comparator); + boolean isExact = i >= 0; - // we want to visit in reverse iteration order, so we visit our children nodes inbetween our keys - if (!isLeaf) - { - int typeOffset = visited / 2; + startChild = isExact ? i + 1 : -1 - i; - if (i % 2 == 0) - { - // This is a child branch. Since children are in the second half of the array, we must - // adjust for the key's we've visited along the way - idx += typeOffset; - } - else - { - // This is a key. Since the keys are in the first half of the array and we are iterating - // in reverse we subtract the childOffset and adjust for children we've walked so far - idx = i - childOffset + typeOffset; - } + if (isExact) + { + value = accumulator.apply(arg, (V) btree[i], value); + if (isStopSentinel(value)) + return value; + from = null; } + } - Object current = btree[idx]; - if (isLeaf || idx < childOffset) - { - V castedCurrent = (V) current; - if (stopCondition != null && stopCondition.apply(castedCurrent)) - return true; + int limit = btree.length - 1 - childOffset; + for (int i=startChild; i<limit; i++) + { + value = accumulate((Object[]) btree[childOffset + i], accumulator, arg, comparator, from, value); - function.accept(castedCurrent); - } - else + if (isStopSentinel(value)) + break; + + if (i < childOffset) { - if (applyReverse((Object[]) current, function, stopCondition)) - return true; + value = accumulator.apply(arg, (V) btree[i], value); + // stop if a sentinel stop value was returned + if (isStopSentinel(value)) + break; } + + if (from != null) + from = null; } + return value; + } + + public static <V> long accumulate(Object[] btree, LongAccumulator<V> accumulator, Comparator<V> comparator, V from, long initialValue) + { + return accumulate(btree, LongAccumulator::apply, accumulator, comparator, from, initialValue); + } - return false; + public static <V> long accumulate(Object[] btree, LongAccumulator<V> accumulator, long initialValue) + { + return accumulate(btree, accumulator, null, null, initialValue); + } + + public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A, V> accumulator, A arg, long initialValue) + { + return accumulate(btree, accumulator, arg, null, null, initialValue); } } diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java index 2fcece6..2ad7f40 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java @@ -25,4 +25,8 @@ import org.apache.cassandra.utils.IndexedSearchIterator; public interface BTreeSearchIterator<K, V> extends IndexedSearchIterator<K, V>, Iterator<V> { + /** + * Reset this Iterator to its starting position + */ + public void rewind(); } diff --git a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java index a6197f8..29aed4b 100644 --- a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java +++ b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java @@ -45,8 +45,13 @@ public class LeafBTreeSearchIterator<K, V> implements BTreeSearchIterator<K, V> this.comparator = comparator; this.lowerBound = lowerBound; this.upperBound = upperBound; - this.nextPos = forwards ? lowerBound : upperBound; - this.hasNext = nextPos >= lowerBound && nextPos <= upperBound; + rewind(); + } + + public void rewind() + { + nextPos = forwards ? lowerBound : upperBound; + hasNext = nextPos >= lowerBound && nextPos <= upperBound; } public V next() @@ -73,12 +78,30 @@ public class LeafBTreeSearchIterator<K, V> implements BTreeSearchIterator<K, V> return Arrays.binarySearch(keys, lb, ub + 1, key, comparator); } + private void updateHasNext() + { + hasNext = nextPos >= lowerBound && nextPos <= upperBound; + } + public V next(K key) { if (!hasNext) return null; V result = null; + // first check the current position in case of sequential access + if (comparator.compare(key, keys[nextPos]) == 0) + { + hasCurrent = true; + result = (V) keys[nextPos]; + nextPos += forwards ? 1 : -1; + } + updateHasNext(); + + if (result != null || !hasNext) + return result; + + // otherwise search against the remaining values int find = searchNext(key); if (find >= 0) { @@ -91,7 +114,7 @@ public class LeafBTreeSearchIterator<K, V> implements BTreeSearchIterator<K, V> nextPos = (forwards ? -1 : -2) - find; hasCurrent = false; } - hasNext = nextPos >= lowerBound && nextPos <= upperBound; + updateHasNext(); return result; } diff --git a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java index e2c6e33..7fadac4 100644 --- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java +++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java @@ -35,15 +35,12 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import io.netty.util.concurrent.FastThreadLocalThread; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.UpdateBuilder; -import org.apache.cassandra.config.Config.CommitLogSync; import org.apache.cassandra.config.*; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.marshal.UTF8Type; @@ -451,7 +448,7 @@ public abstract class CommitLogStressTest { mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), - SerializationHelper.Flag.LOCAL); + DeserializationHelper.Flag.LOCAL); } catch (IOException e) { diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index 8709775..e0215b7 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.Range; @@ -368,7 +369,7 @@ public class ReadCommandTest MessagingService.current_version, cfs.metadata(), columnFilter, - SerializationHelper.Flag.LOCAL)); + DeserializationHelper.Flag.LOCAL)); } } diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java index c52fd3d..392a1a0 100644 --- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java +++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java @@ -34,6 +34,7 @@ import org.junit.Test; import org.apache.cassandra.Util; import org.apache.cassandra.cache.IMeasurableMemory; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; @@ -279,6 +280,7 @@ public class RowIndexEntryTest extends CQLTester { private final UnfilteredRowIterator iterator; private final SequentialWriter writer; + private final SerializationHelper helper; private final SerializationHeader header; private final int version; @@ -306,6 +308,7 @@ public class RowIndexEntryTest extends CQLTester { this.iterator = iterator; this.writer = writer; + this.helper = new SerializationHelper(header); this.header = header; this.version = version; this.observers = observers == null ? Collections.emptyList() : observers; @@ -317,7 +320,7 @@ public class RowIndexEntryTest extends CQLTester ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer); DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer); if (header.hasStatic()) - UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), header, writer, version); + UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), helper, writer, version); } public ColumnIndex build() throws IOException @@ -358,7 +361,7 @@ public class RowIndexEntryTest extends CQLTester startPosition = pos; } - UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, pos - previousRowStart, version); + UnfilteredSerializer.serializer.serialize(unfiltered, helper, writer, pos - previousRowStart, version); // notify observers about each new row if (!observers.isEmpty()) diff --git a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java index 18bc6e0..fa3295a 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.RebufferingInputStream; @@ -62,7 +62,7 @@ public class CDCTestReplayer extends CommitLogReplayer Mutation mutation; try { - mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL); if (mutation.trackedByCDC()) sawCDCMutation = true; } diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java index 9a22b04..5b87d68 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java @@ -26,7 +26,7 @@ import org.junit.Assert; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.rows.SerializationHelper; +import org.apache.cassandra.db.rows.DeserializationHelper; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.RebufferingInputStream; @@ -65,7 +65,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer Mutation mutation; try { - mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); + mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL); Assert.assertTrue(processor.apply(mutation)); } catch (IOException e) diff --git a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java index 7cc1291..e60fb64 100644 --- a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java +++ b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java @@ -88,14 +88,20 @@ public class BTreeTest } }; - private static List<Integer> seq(int count) + private static List<Integer> seq(int count, int interval) { List<Integer> r = new ArrayList<>(); for (int i = 0 ; i < count ; i++) - r.add(i); + if (i % interval == 0) + r.add(i); return r; } + private static List<Integer> seq(int count) + { + return seq(count, 1); + } + private static List<Integer> rand(int count) { Random rand = ThreadLocalRandom.current(); @@ -133,27 +139,60 @@ public class BTreeTest } @Test - public void testApplyForwards() + public void testApply() { List<Integer> input = seq(71); Object[] btree = BTree.build(input, noOp); final List<Integer> result = new ArrayList<>(); - BTree.<Integer>apply(btree, i -> result.add(i), false); + BTree.<Integer>apply(btree, i -> result.add(i)); org.junit.Assert.assertArrayEquals(input.toArray(),result.toArray()); } @Test - public void testApplyReverse() + public void inOrderAccumulation() { List<Integer> input = seq(71); Object[] btree = BTree.build(input, noOp); + long result = BTree.<Integer>accumulate(btree, (o, l) -> { + Assert.assertEquals((long) o, l + 1); + return o; + }, -1); + Assert.assertEquals(result, 70); + } - final List<Integer> result = new ArrayList<>(); - BTree.<Integer>apply(btree, i -> result.add(i), true); + @Test + public void accumulateFrom() + { + int limit = 100; + for (int interval=1; interval<=5; interval++) + { + List<Integer> input = seq(limit, interval); + Object[] btree = BTree.build(input, noOp); + for (int start=0; start<=limit; start+=interval) + { + int thisInterval = interval; + String errMsg = String.format("interval=%s, start=%s", interval, start); + long result = BTree.accumulate(btree, (o, l) -> { + Assert.assertEquals(errMsg, (long) o, l + thisInterval); + return o; + }, Comparator.naturalOrder(), start, start - thisInterval); + Assert.assertEquals(errMsg, result, (limit-1)/interval*interval); + } + } + } - org.junit.Assert.assertArrayEquals(Lists.reverse(input).toArray(),result.toArray()); + /** + * accumulate function should not be called if we ask it to start past the end of the btree + */ + @Test + public void accumulateFromEnd() + { + List<Integer> input = seq(100); + Object[] btree = BTree.build(input, noOp); + long result = BTree.accumulate(btree, (o, l) -> 1, Integer::compareTo, 101, 0L); + Assert.assertEquals(0, result); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org