This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8576e76 Minimize BTree iterator allocations
8576e76 is described below
commit 8576e769d13b3e887ea604074641fd4c42af5e8a
Author: Blake Eggleston <[email protected]>
AuthorDate: Tue Oct 15 20:10:42 2019 -0700
Minimize BTree iterator allocations
Patch by Blake Eggleston; Reviewed by Benedict Elliott Smith for
CASSANDRA-15389
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ColumnIndex.java | 6 +-
src/java/org/apache/cassandra/db/Columns.java | 9 +-
src/java/org/apache/cassandra/db/Mutation.java | 6 +-
src/java/org/apache/cassandra/db/ReadResponse.java | 8 +-
.../apache/cassandra/db/SerializationHeader.java | 1 +
.../cassandra/db/UnfilteredDeserializer.java | 6 +-
.../db/columniterator/AbstractSSTableIterator.java | 6 +-
.../cassandra/db/commitlog/CommitLogReader.java | 4 +-
.../db/partitions/CachedBTreePartition.java | 4 +-
.../cassandra/db/partitions/PartitionUpdate.java | 7 +-
.../partitions/UnfilteredPartitionIterators.java | 2 +-
.../org/apache/cassandra/db/rows/AbstractRow.java | 18 +-
.../org/apache/cassandra/db/rows/BTreeRow.java | 103 +++++++----
src/java/org/apache/cassandra/db/rows/Cell.java | 2 +-
.../org/apache/cassandra/db/rows/ColumnData.java | 5 +
.../cassandra/db/rows/ComplexColumnData.java | 18 +-
...ationHelper.java => DeserializationHelper.java} | 6 +-
src/java/org/apache/cassandra/db/rows/Row.java | 23 ++-
src/java/org/apache/cassandra/db/rows/Rows.java | 72 ++++---
.../cassandra/db/rows/SerializationHelper.java | 134 +++-----------
.../db/rows/UnfilteredRowIteratorSerializer.java | 21 ++-
.../cassandra/db/rows/UnfilteredSerializer.java | 95 +++++-----
.../db/streaming/CassandraStreamReader.java | 4 +-
.../io/sstable/SSTableIdentityIterator.java | 4 +-
.../io/sstable/SSTableSimpleIterator.java | 12 +-
.../io/sstable/SSTableSimpleUnsortedWriter.java | 5 +-
.../org/apache/cassandra/service/paxos/Commit.java | 2 +-
.../{WrappedInt.java => BiLongAccumulator.java} | 32 +---
.../{WrappedInt.java => LongAccumulator.java} | 32 +---
.../org/apache/cassandra/utils/btree/BTree.java | 206 ++++++++++++---------
.../cassandra/utils/btree/BTreeSearchIterator.java | 4 +
.../utils/btree/LeafBTreeSearchIterator.java | 29 ++-
.../db/commitlog/CommitLogStressTest.java | 5 +-
.../org/apache/cassandra/db/ReadCommandTest.java | 3 +-
.../org/apache/cassandra/db/RowIndexEntryTest.java | 7 +-
.../cassandra/db/commitlog/CDCTestReplayer.java | 4 +-
.../db/commitlog/CommitLogTestReplayer.java | 4 +-
.../apache/cassandra/utils/btree/BTreeTest.java | 55 +++++-
39 files changed, 492 insertions(+), 473 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 96ce286..c1e5aeb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Minimize BTree iterator allocations (CASSANDRA-15389)
* Add client request size server metrics (CASSANDRA-15704)
* Add additional logging around FileUtils and compaction leftover cleanup
(CASSANDRA-15705)
* Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh
(CASSANDRA-15711)
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 74ad264..e11f784 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -53,6 +53,7 @@ public class ColumnIndex
public int columnIndexCount;
private int[] indexOffsets;
+ private final SerializationHelper helper;
private final SerializationHeader header;
private final int version;
private final SequentialWriter writer;
@@ -79,6 +80,7 @@ public class ColumnIndex
Collection<SSTableFlushObserver> observers,
ISerializer<IndexInfo> indexInfoSerializer)
{
+ this.helper = new SerializationHelper(header);
this.header = header;
this.writer = writer;
this.version = version.correspondingMessagingVersion();
@@ -126,7 +128,7 @@ public class ColumnIndex
{
Row staticRow = iterator.staticRow();
- UnfilteredSerializer.serializer.serializeStaticRow(staticRow,
header, writer, version);
+ UnfilteredSerializer.serializer.serializeStaticRow(staticRow,
helper, writer, version);
if (!observers.isEmpty())
observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
}
@@ -248,7 +250,7 @@ public class ColumnIndex
startPosition = pos;
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header, writer,
pos - previousRowStart, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, helper, writer,
pos - previousRowStart, version);
// notify observers about each new row
if (!observers.isEmpty())
diff --git a/src/java/org/apache/cassandra/db/Columns.java
b/src/java/org/apache/cassandra/db/Columns.java
index f56072e..fe13919 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -53,7 +53,7 @@ public class Columns extends
AbstractCollection<ColumnMetadata> implements Colle
public static final Serializer serializer = new Serializer();
public static final Columns NONE = new Columns(BTree.empty(), 0);
- private static final ColumnMetadata FIRST_COMPLEX_STATIC =
+ public static final ColumnMetadata FIRST_COMPLEX_STATIC =
new ColumnMetadata("",
"",
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER,
UTF8Type.instance),
@@ -61,7 +61,7 @@ public class Columns extends
AbstractCollection<ColumnMetadata> implements Colle
ColumnMetadata.NO_POSITION,
ColumnMetadata.Kind.STATIC);
- private static final ColumnMetadata FIRST_COMPLEX_REGULAR =
+ public static final ColumnMetadata FIRST_COMPLEX_REGULAR =
new ColumnMetadata("",
"",
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER,
UTF8Type.instance),
@@ -382,11 +382,10 @@ public class Columns extends
AbstractCollection<ColumnMetadata> implements Colle
/**
* Apply a function to each column definition in forwards or reversed
order.
* @param function
- * @param reversed
*/
- public void apply(Consumer<ColumnMetadata> function, boolean reversed)
+ public void apply(Consumer<ColumnMetadata> function)
{
- BTree.apply(columns, function, reversed);
+ BTree.apply(columns, function);
}
@Override
diff --git a/src/java/org/apache/cassandra/db/Mutation.java
b/src/java/org/apache/cassandra/db/Mutation.java
index 22c4ed8..3d27ef3 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -336,7 +336,7 @@ public class Mutation implements IMutation
PartitionUpdate.serializer.serialize(entry.getValue(), out,
version);
}
- public Mutation deserialize(DataInputPlus in, int version,
SerializationHelper.Flag flag) throws IOException
+ public Mutation deserialize(DataInputPlus in, int version,
DeserializationHelper.Flag flag) throws IOException
{
int size = (int)in.readUnsignedVInt();
assert size > 0;
@@ -359,7 +359,7 @@ public class Mutation implements IMutation
public Mutation deserialize(DataInputPlus in, int version) throws
IOException
{
- return deserialize(in, version,
SerializationHelper.Flag.FROM_REMOTE);
+ return deserialize(in, version,
DeserializationHelper.Flag.FROM_REMOTE);
}
public long serializedSize(Mutation mutation, int version)
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java
b/src/java/org/apache/cassandra/db/ReadResponse.java
index affbbbe..3f6481d 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -182,7 +182,7 @@ public abstract class ReadResponse
command.getRepairedDataDigest(),
command.isRepairedDataDigestConclusive(),
MessagingService.current_version,
- SerializationHelper.Flag.LOCAL);
+ DeserializationHelper.Flag.LOCAL);
}
private static ByteBuffer build(UnfilteredPartitionIterator iter,
ColumnFilter selection)
@@ -208,7 +208,7 @@ public abstract class ReadResponse
boolean isRepairedDigestConclusive,
int version)
{
- super(data, repairedDataDigest, isRepairedDigestConclusive,
version, SerializationHelper.Flag.FROM_REMOTE);
+ super(data, repairedDataDigest, isRepairedDigestConclusive,
version, DeserializationHelper.Flag.FROM_REMOTE);
}
}
@@ -220,13 +220,13 @@ public abstract class ReadResponse
private final ByteBuffer repairedDataDigest;
private final boolean isRepairedDigestConclusive;
private final int dataSerializationVersion;
- private final SerializationHelper.Flag flag;
+ private final DeserializationHelper.Flag flag;
protected DataResponse(ByteBuffer data,
ByteBuffer repairedDataDigest,
boolean isRepairedDigestConclusive,
int dataSerializationVersion,
- SerializationHelper.Flag flag)
+ DeserializationHelper.Flag flag)
{
super();
this.data = data;
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 15ef268..1c22feb 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.ImmutableList;
+
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 84ff691..f9ff1d7 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -34,7 +34,7 @@ public class UnfilteredDeserializer
{
protected final TableMetadata metadata;
protected final DataInputPlus in;
- protected final SerializationHelper helper;
+ protected final DeserializationHelper helper;
private final ClusteringPrefix.Deserializer clusteringDeserializer;
private final SerializationHeader header;
@@ -49,7 +49,7 @@ public class UnfilteredDeserializer
private UnfilteredDeserializer(TableMetadata metadata,
DataInputPlus in,
SerializationHeader header,
- SerializationHelper helper)
+ DeserializationHelper helper)
{
this.metadata = metadata;
this.in = in;
@@ -62,7 +62,7 @@ public class UnfilteredDeserializer
public static UnfilteredDeserializer create(TableMetadata metadata,
DataInputPlus in,
SerializationHeader header,
- SerializationHelper helper)
+ DeserializationHelper helper)
{
return new UnfilteredDeserializer(metadata, in, header, helper);
}
diff --git
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index cfc7da2..c631f1c 100644
---
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -43,7 +43,7 @@ public abstract class AbstractSSTableIterator implements
UnfilteredRowIterator
protected final DecoratedKey key;
protected final DeletionTime partitionLevelDeletion;
protected final ColumnFilter columns;
- protected final SerializationHelper helper;
+ protected final DeserializationHelper helper;
protected final Row staticRow;
protected final Reader reader;
@@ -70,7 +70,7 @@ public abstract class AbstractSSTableIterator implements
UnfilteredRowIterator
this.key = key;
this.columns = columnFilter;
this.slices = slices;
- this.helper = new SerializationHelper(metadata,
sstable.descriptor.version.correspondingMessagingVersion(),
SerializationHelper.Flag.LOCAL, columnFilter);
+ this.helper = new DeserializationHelper(metadata,
sstable.descriptor.version.correspondingMessagingVersion(),
DeserializationHelper.Flag.LOCAL, columnFilter);
if (indexEntry == null)
{
@@ -159,7 +159,7 @@ public abstract class AbstractSSTableIterator implements
UnfilteredRowIterator
private static Row readStaticRow(SSTableReader sstable,
FileDataInput file,
- SerializationHelper helper,
+ DeserializationHelper helper,
Columns statics) throws IOException
{
if (!sstable.header.hasStatic())
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index c91841f..5123580 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.Mutation;
import
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
import
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -429,7 +429,7 @@ public class CommitLogReader
{
mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
-
SerializationHelper.Flag.LOCAL);
+
DeserializationHelper.Flag.LOCAL);
// doublecheck that what we read is still] valid for the current
schema
for (PartitionUpdate upd : mutation.getPartitionUpdates())
upd.validate();
diff --git
a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index 4d9c227..9a2b331 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -177,11 +177,11 @@ public class CachedBTreePartition extends
ImmutableBTreePartition implements Cac
TableMetadata metadata =
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
- UnfilteredRowIteratorSerializer.Header header =
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null,
in, version, SerializationHelper.Flag.LOCAL);
+ UnfilteredRowIteratorSerializer.Header header =
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null,
in, version, DeserializationHelper.Flag.LOCAL);
assert !header.isReversed && header.rowEstimate >= 0;
Holder holder;
- try (UnfilteredRowIterator partition =
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata,
SerializationHelper.Flag.LOCAL, header))
+ try (UnfilteredRowIterator partition =
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata,
DeserializationHelper.Flag.LOCAL, header))
{
holder = ImmutableBTreePartition.build(partition,
header.rowEstimate);
}
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index ec01fa6..076c975 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.db.partitions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
import java.util.List;
import com.google.common.collect.Iterables;
@@ -38,7 +36,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
@@ -230,7 +227,7 @@ public class PartitionUpdate extends AbstractBTreePartition
{
return serializer.deserialize(new DataInputBuffer(bytes, true),
version,
- SerializationHelper.Flag.LOCAL);
+ DeserializationHelper.Flag.LOCAL);
}
catch (IOException e)
{
@@ -636,7 +633,7 @@ public class PartitionUpdate extends AbstractBTreePartition
}
}
- public PartitionUpdate deserialize(DataInputPlus in, int version,
SerializationHelper.Flag flag) throws IOException
+ public PartitionUpdate deserialize(DataInputPlus in, int version,
DeserializationHelper.Flag flag) throws IOException
{
TableMetadata metadata =
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
UnfilteredRowIteratorSerializer.Header header =
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null,
in, version, flag);
diff --git
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 945bcb4..30b4d9e 100644
---
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -316,7 +316,7 @@ public abstract class UnfilteredPartitionIterators
out.writeBoolean(false);
}
- public UnfilteredPartitionIterator deserialize(final DataInputPlus in,
final int version, final TableMetadata metadata, final ColumnFilter selection,
final SerializationHelper.Flag flag) throws IOException
+ public UnfilteredPartitionIterator deserialize(final DataInputPlus in,
final int version, final TableMetadata metadata, final ColumnFilter selection,
final DeserializationHelper.Flag flag) throws IOException
{
// Skip now unused isForThrift boolean
in.readBoolean();
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 957ffd4..fc90e34 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -24,10 +24,11 @@ import java.util.stream.StreamSupport;
import com.google.common.collect.Iterables;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Digest;
import org.apache.cassandra.db.marshal.CollectionType;
import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.serializers.MarshalException;
/**
@@ -66,8 +67,7 @@ public abstract class AbstractRow implements Row
deletion().digest(digest);
primaryKeyLivenessInfo().digest(digest);
- for (ColumnData cd : this)
- cd.digest(digest);
+ apply(ColumnData::digest, digest);
}
public void validateData(TableMetadata metadata)
@@ -93,15 +93,7 @@ public abstract class AbstractRow implements Row
if (deletion().time().localDeletionTime() < 0)
throw new MarshalException("A local deletion time should not be
negative in '" + metadata + "'");
- for (ColumnData cd : this)
- try
- {
- cd.validate();
- }
- catch (Exception e)
- {
- throw new MarshalException("data for '" +
cd.column.debugString() + "', " + cd + " in '" + metadata + "' didn't
validate", e);
- }
+ apply(cd -> cd.validate());
}
public boolean hasInvalidDeletions()
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index dc3219a..6689c77 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -19,13 +19,14 @@ package org.apache.cassandra.db.rows;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import com.google.common.base.Function;
import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
@@ -53,6 +54,11 @@ public class BTreeRow extends AbstractRow
// The data for each columns present in this row in column sorted order.
private final Object[] btree;
+ private static final ColumnData FIRST_COMPLEX_STATIC = new
ComplexColumnData(Columns.FIRST_COMPLEX_STATIC, new Object[0], new
DeletionTime(0, 0));
+ private static final ColumnData FIRST_COMPLEX_REGULAR = new
ComplexColumnData(Columns.FIRST_COMPLEX_REGULAR, new Object[0], new
DeletionTime(0, 0));
+ private static final Comparator<ColumnData> COLUMN_COMPARATOR = (cd1, cd2)
-> cd1.column.compareTo(cd2.column);
+
+
// We need to filter the tombstones of a row on every read (twice in fact:
first to remove purgeable tombstone, and then after reconciliation to remove
// all tombstone since we don't return them to the client) as well as on
compaction. But it's likely that many rows won't have any tombstone at all, so
// we want to speed up that case by not having to iterate/copy the row in
this case. We could keep a single boolean telling us if we have tombstones,
@@ -90,8 +96,8 @@ public class BTreeRow extends AbstractRow
int minDeletionTime =
Math.min(minDeletionTime(primaryKeyLivenessInfo),
minDeletionTime(deletion.time()));
if (minDeletionTime != Integer.MIN_VALUE)
{
- for (ColumnData cd : BTree.<ColumnData>iterable(btree))
- minDeletionTime = Math.min(minDeletionTime,
minDeletionTime(cd));
+ long result = BTree.<ColumnData>accumulate(btree, (cd, l) ->
Math.min(l, minDeletionTime(cd)) , minDeletionTime);
+ minDeletionTime = Ints.checkedCast(result);
}
return create(clustering, primaryKeyLivenessInfo, deletion, btree,
minDeletionTime);
@@ -168,23 +174,49 @@ public class BTreeRow extends AbstractRow
return cd.column().isSimple() ? minDeletionTime((Cell) cd) :
minDeletionTime((ComplexColumnData)cd);
}
- public void apply(Consumer<ColumnData> function, boolean reversed)
+ public void apply(Consumer<ColumnData> function)
+ {
+ BTree.apply(btree, function);
+ }
+
+ public <A> void apply(BiConsumer<A, ColumnData> function, A arg)
+ {
+ BTree.apply(btree, function, arg);
+ }
+
+ public long accumulate(LongAccumulator<ColumnData> accumulator, long
initialValue)
{
- BTree.apply(btree, function, reversed);
+ return BTree.accumulate(btree, accumulator, initialValue);
}
- public void apply(Consumer<ColumnData> funtion,
com.google.common.base.Predicate<ColumnData> stopCondition, boolean reversed)
+ public long accumulate(LongAccumulator<ColumnData> accumulator,
Comparator<ColumnData> comparator, ColumnData from, long initialValue)
{
- BTree.apply(btree, funtion, stopCondition, reversed);
+ return BTree.accumulate(btree, accumulator, comparator, from,
initialValue);
+ }
+
+ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A
arg, long initialValue)
+ {
+ return BTree.accumulate(btree, accumulator, arg, initialValue);
+ }
+
+ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A
arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue)
+ {
+ return BTree.accumulate(btree, accumulator, arg, comparator, from,
initialValue);
}
private static int minDeletionTime(Object[] btree, LivenessInfo info,
DeletionTime rowDeletion)
{
- //we have to wrap this for the lambda
- final WrappedInt min = new WrappedInt(Math.min(minDeletionTime(info),
minDeletionTime(rowDeletion)));
+ long min = Math.min(minDeletionTime(info),
minDeletionTime(rowDeletion));
+
+ min = BTree.<ColumnData>accumulate(btree, (cd, l) -> {
+ int m = Math.min((int) l, minDeletionTime(cd));
+ return m != Integer.MIN_VALUE ? m : Long.MAX_VALUE;
+ }, min);
- BTree.<ColumnData>apply(btree, cd -> min.set( Math.min(min.get(),
minDeletionTime(cd)) ), cd -> min.get() == Integer.MIN_VALUE, false);
- return min.get();
+ if (min == Long.MAX_VALUE)
+ return Integer.MIN_VALUE;
+
+ return Ints.checkedCast(min);
}
public Clustering clustering()
@@ -334,33 +366,19 @@ public class BTreeRow extends AbstractRow
public boolean hasComplex()
{
- // We start by the end cause we know complex columns sort after the
simple ones
- ColumnData cd = Iterables.getFirst(BTree.<ColumnData>iterable(btree,
BTree.Dir.DESC), null);
- return cd != null && cd.column.isComplex();
+ if (BTree.isEmpty(btree))
+ return false;
+
+ int size = BTree.size(btree);
+ ColumnData last = BTree.findByIndex(btree, size - 1);
+ return last.column.isComplex();
}
public boolean hasComplexDeletion()
{
- final WrappedBoolean result = new WrappedBoolean(false);
-
- // We start by the end cause we know complex columns sort before
simple ones
- apply(c -> {}, cd -> {
- if (cd.column.isSimple())
- {
- result.set(false);
- return true;
- }
-
- if (!((ComplexColumnData) cd).complexDeletion().isLive())
- {
- result.set(true);
- return true;
- }
-
- return false;
- }, true);
-
- return result.get();
+ long result = accumulate((cd, v) -> ((ComplexColumnData)
cd).complexDeletion().isLive() ? 0 : Long.MAX_VALUE,
+ COLUMN_COMPARATOR, isStatic() ?
FIRST_COMPLEX_STATIC : FIRST_COMPLEX_REGULAR, 0L);
+ return result == Long.MAX_VALUE;
}
public Row markCounterLocalToBeCleared()
@@ -375,6 +393,15 @@ public class BTreeRow extends AbstractRow
return nowInSec >= minLocalDeletionTime;
}
+ public boolean hasInvalidDeletions()
+ {
+ if (primaryKeyLivenessInfo().isExpiring() &&
(primaryKeyLivenessInfo().ttl() < 0 ||
primaryKeyLivenessInfo().localExpirationTime() < 0))
+ return true;
+ if (!deletion().time().validate())
+ return true;
+ return accumulate((cd, v) -> cd.hasInvalidDeletions() ? Long.MAX_VALUE
: v, 0) != 0;
+ }
+
/**
* Returns a copy of the row where all timestamps for live data have
replaced by {@code newTimestamp} and
* all deletion timestamp by {@code newTimestamp - 1}.
@@ -440,9 +467,7 @@ public class BTreeRow extends AbstractRow
+ primaryKeyLivenessInfo.dataSize()
+ deletion.dataSize();
- for (ColumnData cd : this)
- dataSize += cd.dataSize();
- return dataSize;
+ return Ints.checkedCast(accumulate((cd, v) -> v + cd.dataSize(),
dataSize));
}
public long unsharedHeapSizeExcludingData()
@@ -451,9 +476,7 @@ public class BTreeRow extends AbstractRow
+ clustering.unsharedHeapSizeExcludingData()
+ BTree.sizeOfStructureOnHeap(btree);
- for (ColumnData cd : this)
- heapSize += cd.unsharedHeapSizeExcludingData();
- return heapSize;
+ return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(),
heapSize);
}
public static Row.Builder sortedBuilder()
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 300bbce..959676a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -214,7 +214,7 @@ public abstract class Cell extends ColumnData
header.getType(column).writeValue(cell.value(), out);
}
- public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness,
ColumnMetadata column, SerializationHeader header, SerializationHelper helper)
throws IOException
+ public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness,
ColumnMetadata column, SerializationHeader header, DeserializationHelper
helper) throws IOException
{
int flags = in.readUnsignedByte();
boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index e5f5550..36aad97 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -78,6 +78,11 @@ public abstract class ColumnData
*/
public abstract void digest(Digest digest);
+ public static void digest(Digest digest, ColumnData cd)
+ {
+ cd.digest(digest);
+ }
+
/**
* Returns a copy of the data where all timestamps for live data have
replaced by {@code newTimestamp} and
* all deletion timestamp by {@code newTimestamp - 1}.
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 832167f..5b03504 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -30,8 +30,11 @@ import org.apache.cassandra.db.LivenessInfo;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.marshal.ByteType;
import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.btree.BTree;
@@ -60,11 +63,6 @@ public class ComplexColumnData extends ColumnData implements
Iterable<Cell>
this.complexDeletion = complexDeletion;
}
- public boolean hasCells()
- {
- return !BTree.isEmpty(cells);
- }
-
public int cellsCount()
{
return BTree.size(cells);
@@ -106,6 +104,16 @@ public class ComplexColumnData extends ColumnData
implements Iterable<Cell>
return BTree.iterator(cells, BTree.Dir.DESC);
}
+ public long accumulate(LongAccumulator<Cell> accumulator, long
initialValue)
+ {
+ return BTree.accumulate(cells, accumulator, initialValue);
+ }
+
+ public <A> long accumulate(BiLongAccumulator<A, Cell> accumulator, A arg,
long initialValue)
+ {
+ return BTree.accumulate(cells, accumulator, arg, initialValue);
+ }
+
public int dataSize()
{
int size = complexDeletion.dataSize();
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
similarity index 95%
copy from src/java/org/apache/cassandra/db/rows/SerializationHelper.java
copy to src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
index db23cb8..386e6ef 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.schema.DroppedColumn;
-public class SerializationHelper
+public class DeserializationHelper
{
/**
* Flag affecting deserialization behavior (this only affect counters in
practice).
@@ -56,7 +56,7 @@ public class SerializationHelper
private DroppedColumn currentDroppedComplex;
- public SerializationHelper(TableMetadata metadata, int version, Flag flag,
ColumnFilter columnsToFetch)
+ public DeserializationHelper(TableMetadata metadata, int version, Flag
flag, ColumnFilter columnsToFetch)
{
this.flag = flag;
this.version = version;
@@ -65,7 +65,7 @@ public class SerializationHelper
this.hasDroppedColumns = droppedColumns.size() > 0;
}
- public SerializationHelper(TableMetadata metadata, int version, Flag flag)
+ public DeserializationHelper(TableMetadata metadata, int version, Flag
flag)
{
this(metadata, version, flag, null);
}
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java
b/src/java/org/apache/cassandra/db/rows/Row.java
index 2f752b8..ee93da4 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -18,15 +18,16 @@
package org.apache.cassandra.db.rows;
import java.util.*;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
-import com.google.common.base.Predicate;
-
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;
@@ -282,12 +283,24 @@ public interface Row extends Unfiltered,
Iterable<ColumnData>
/**
* Apply a function to every column in a row
*/
- public void apply(Consumer<ColumnData> function, boolean reverse);
+ public void apply(Consumer<ColumnData> function);
+
+ /**
+ * Apply a function to every column in a row
+ */
+ public <A> void apply(BiConsumer<A, ColumnData> function, A arg);
/**
- * Apply a funtion to every column in a row until a stop condition is
reached
+ * Apply an accumulation funtion to every column in a row
*/
- public void apply(Consumer<ColumnData> function, Predicate<ColumnData>
stopCondition, boolean reverse);
+
+ public long accumulate(LongAccumulator<ColumnData> accumulator, long
initialValue);
+
+ public long accumulate(LongAccumulator<ColumnData> accumulator,
Comparator<ColumnData> comparator, ColumnData from, long initialValue);
+
+ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A
arg, long initialValue);
+
+ public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A
arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue);
/**
* A row deletion/tombstone.
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java
b/src/java/org/apache/cassandra/db/rows/Rows.java
index d62d3b5..58284ac 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.WrappedInt;
/**
* Static utilities to work on Row objects.
@@ -75,6 +74,46 @@ public abstract class Rows
return new SimpleBuilders.RowBuilder(metadata, clusteringValues);
}
+ private static class StatsAccumulation
+ {
+ private static final long COLUMN_INCR = 1L << 32;
+ private static final long CELL_INCR = 1L;
+
+ private static long accumulateOnCell(PartitionStatisticsCollector
collector, Cell cell, long l)
+ {
+ Cells.collectStats(cell, collector);
+ return l + CELL_INCR;
+ }
+
+ private static long
accumulateOnColumnData(PartitionStatisticsCollector collector, ColumnData cd,
long l)
+ {
+ if (cd.column().isSimple())
+ {
+ l = accumulateOnCell(collector, (Cell) cd, l) + COLUMN_INCR;
+ }
+ else
+ {
+ ComplexColumnData complexData = (ComplexColumnData)cd;
+ collector.update(complexData.complexDeletion());
+ int startingCells = unpackCellCount(l);
+ l =
complexData.accumulate(StatsAccumulation::accumulateOnCell, collector, l);
+ if (unpackCellCount(l) > startingCells)
+ l += COLUMN_INCR;
+ }
+ return l;
+ }
+
+ private static int unpackCellCount(long v)
+ {
+ return (int) (v & 0xFFFFFFFFL);
+ }
+
+ private static int unpackColumnCount(long v)
+ {
+ return (int) (v >>> 32);
+ }
+ }
+
/**
* Collect statistics on a given row.
*
@@ -89,35 +128,10 @@ public abstract class Rows
collector.update(row.primaryKeyLivenessInfo());
collector.update(row.deletion().time());
- //we have to wrap these for the lambda
- final WrappedInt columnCount = new WrappedInt(0);
- final WrappedInt cellCount = new WrappedInt(0);
-
- row.apply(cd -> {
- if (cd.column().isSimple())
- {
- columnCount.increment();
- cellCount.increment();
- Cells.collectStats((Cell) cd, collector);
- }
- else
- {
- ComplexColumnData complexData = (ComplexColumnData)cd;
- collector.update(complexData.complexDeletion());
- if (complexData.hasCells())
- {
- columnCount.increment();
- for (Cell cell : complexData)
- {
- cellCount.increment();
- Cells.collectStats(cell, collector);
- }
- }
- }
- }, false);
+ long result =
row.accumulate(StatsAccumulation::accumulateOnColumnData, collector, 0);
- collector.updateColumnSetPerRow(columnCount.get());
- return cellCount.get();
+
collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result));
+ return StatsAccumulation.unpackCellCount(result);
}
/**
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index db23cb8..dca4240 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -15,135 +15,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.cassandra.db.rows;
-import java.nio.ByteBuffer;
-import java.util.*;
+package org.apache.cassandra.db.rows;
+import org.apache.cassandra.db.SerializationHeader;
import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
public class SerializationHelper
{
- /**
- * Flag affecting deserialization behavior (this only affect counters in
practice).
- * - LOCAL: for deserialization of local data (Expired columns are
- * converted to tombstones (to gain disk space)).
- * - FROM_REMOTE: for deserialization of data received from remote hosts
- * (Expired columns are converted to tombstone and counters have
- * their delta cleared)
- * - PRESERVE_SIZE: used when no transformation must be performed, i.e,
- * when we must ensure that deserializing and reserializing the
- * result yield the exact same bytes. Streaming uses this.
- */
- public enum Flag
- {
- LOCAL, FROM_REMOTE, PRESERVE_SIZE
- }
-
- private final Flag flag;
- public final int version;
-
- private final ColumnFilter columnsToFetch;
- private ColumnFilter.Tester tester;
-
- private final boolean hasDroppedColumns;
- private final Map<ByteBuffer, DroppedColumn> droppedColumns;
- private DroppedColumn currentDroppedComplex;
-
-
- public SerializationHelper(TableMetadata metadata, int version, Flag flag,
ColumnFilter columnsToFetch)
- {
- this.flag = flag;
- this.version = version;
- this.columnsToFetch = columnsToFetch;
- this.droppedColumns = metadata.droppedColumns;
- this.hasDroppedColumns = droppedColumns.size() > 0;
- }
-
- public SerializationHelper(TableMetadata metadata, int version, Flag flag)
- {
- this(metadata, version, flag, null);
- }
+ public final SerializationHeader header;
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null;
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars =
null;
- public boolean includes(ColumnMetadata column)
+ public SerializationHelper(SerializationHeader header)
{
- return columnsToFetch == null || columnsToFetch.fetches(column);
+ this.header = header;
}
- public boolean includes(Cell cell, LivenessInfo rowLiveness)
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics()
{
- if (columnsToFetch == null)
- return true;
-
- // During queries, some columns are included even though they are not
queried by the user because
- // we always need to distinguish between having a row (with
potentially only null values) and not
- // having a row at all (see #CASSANDRA-7085 for background). In the
case where the column is not
- // actually requested by the user however (canSkipValue), we can skip
the full cell if the cell
- // timestamp is lower than the row one, because in that case, the row
timestamp is enough proof
- // of the liveness of the row. Otherwise, we'll only be able to skip
the values of those cells.
- ColumnMetadata column = cell.column();
- if (column.isComplex())
- {
- if (!includes(cell.path()))
- return false;
-
- return !canSkipValue(cell.path()) || cell.timestamp() >=
rowLiveness.timestamp();
- }
- else
- {
- return columnsToFetch.fetchedColumnIsQueried(column) ||
cell.timestamp() >= rowLiveness.timestamp();
- }
- }
-
- public boolean includes(CellPath path)
- {
- return path == null || tester == null || tester.fetches(path);
- }
-
- public boolean canSkipValue(ColumnMetadata column)
- {
- return columnsToFetch != null &&
!columnsToFetch.fetchedColumnIsQueried(column);
- }
-
- public boolean canSkipValue(CellPath path)
- {
- return path != null && tester != null &&
!tester.fetchedCellIsQueried(path);
- }
-
- public void startOfComplexColumn(ColumnMetadata column)
- {
- this.tester = columnsToFetch == null ? null :
columnsToFetch.newTester(column);
- this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
- }
-
- public void endOfComplexColumn()
- {
- this.tester = null;
- }
-
- public boolean isDropped(Cell cell, boolean isComplex)
- {
- if (!hasDroppedColumns)
- return false;
-
- DroppedColumn dropped = isComplex ? currentDroppedComplex :
droppedColumns.get(cell.column().name.bytes);
- return dropped != null && cell.timestamp() <= dropped.droppedTime;
+ if (statics == null)
+ statics = header.columns().statics.iterator();
+ return statics;
}
- public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
+ private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars()
{
- return currentDroppedComplex != null &&
complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
+ if (regulars == null)
+ regulars = header.columns().regulars.iterator();
+ return regulars;
}
- public ByteBuffer maybeClearCounterValue(ByteBuffer value)
+ public SearchIterator<ColumnMetadata, ColumnMetadata> iterator(boolean
isStatic)
{
- return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL &&
CounterContext.instance().shouldClearLocal(value))
- ? CounterContext.instance().clearAllLocal(value)
- : value;
+ BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator =
isStatic ? statics() : regulars();
+ iterator.rewind();
+ return iterator;
}
}
diff --git
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 7cac5e6..df67754 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -123,18 +123,19 @@ public class UnfilteredRowIteratorSerializer
out.writeByte((byte)flags);
SerializationHeader.serializer.serializeForMessaging(header,
selection, out, hasStatic);
+ SerializationHelper helper = new SerializationHelper(header);
if (!partitionDeletion.isLive())
header.writeDeletionTime(partitionDeletion, out);
if (hasStatic)
- UnfilteredSerializer.serializer.serialize(staticRow, header, out,
version);
+ UnfilteredSerializer.serializer.serialize(staticRow, helper, out,
version);
if (rowEstimate >= 0)
out.writeUnsignedVInt(rowEstimate);
while (iterator.hasNext())
- UnfilteredSerializer.serializer.serialize(iterator.next(), header,
out, version);
+ UnfilteredSerializer.serializer.serialize(iterator.next(), helper,
out, version);
UnfilteredSerializer.serializer.writeEndOfPartition(out);
}
@@ -147,6 +148,8 @@ public class UnfilteredRowIteratorSerializer
iterator.columns(),
iterator.stats());
+ SerializationHelper helper = new SerializationHelper(header);
+
assert rowEstimate >= 0;
long size =
ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey())
@@ -165,19 +168,19 @@ public class UnfilteredRowIteratorSerializer
size += header.deletionTimeSerializedSize(partitionDeletion);
if (hasStatic)
- size += UnfilteredSerializer.serializer.serializedSize(staticRow,
header, version);
+ size += UnfilteredSerializer.serializer.serializedSize(staticRow,
helper, version);
if (rowEstimate >= 0)
size += TypeSizes.sizeofUnsignedVInt(rowEstimate);
while (iterator.hasNext())
- size +=
UnfilteredSerializer.serializer.serializedSize(iterator.next(), header,
version);
+ size +=
UnfilteredSerializer.serializer.serializedSize(iterator.next(), helper,
version);
size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition();
return size;
}
- public Header deserializeHeader(TableMetadata metadata, ColumnFilter
selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws
IOException
+ public Header deserializeHeader(TableMetadata metadata, ColumnFilter
selection, DataInputPlus in, int version, DeserializationHelper.Flag flag)
throws IOException
{
DecoratedKey key =
metadata.partitioner.decorateKey(ByteBufferUtil.readWithVIntLength(in));
int flags = in.readUnsignedByte();
@@ -198,18 +201,18 @@ public class UnfilteredRowIteratorSerializer
Row staticRow = Rows.EMPTY_STATIC_ROW;
if (hasStatic)
- staticRow =
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new
SerializationHelper(metadata, version, flag));
+ staticRow =
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new
DeserializationHelper(metadata, version, flag));
int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1;
return new Header(header, key, isReversed, false, partitionDeletion,
staticRow, rowEstimate);
}
- public UnfilteredRowIterator deserialize(DataInputPlus in, int version,
TableMetadata metadata, SerializationHelper.Flag flag, Header header) throws
IOException
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version,
TableMetadata metadata, DeserializationHelper.Flag flag, Header header) throws
IOException
{
if (header.isEmpty)
return EmptyIterators.unfilteredRow(metadata, header.key,
header.isReversed);
- final SerializationHelper helper = new SerializationHelper(metadata,
version, flag);
+ final DeserializationHelper helper = new
DeserializationHelper(metadata, version, flag);
final SerializationHeader sHeader = header.sHeader;
return new AbstractUnfilteredRowIterator(metadata, header.key,
header.partitionDeletion, sHeader.columns(), header.staticRow,
header.isReversed, sHeader.stats())
{
@@ -230,7 +233,7 @@ public class UnfilteredRowIteratorSerializer
};
}
- public UnfilteredRowIterator deserialize(DataInputPlus in, int version,
TableMetadata metadata, ColumnFilter selection, SerializationHelper.Flag flag)
throws IOException
+ public UnfilteredRowIterator deserialize(DataInputPlus in, int version,
TableMetadata metadata, ColumnFilter selection, DeserializationHelper.Flag
flag) throws IOException
{
return deserialize(in, version, metadata, flag,
deserializeHeader(metadata, selection, in, version, flag));
}
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 7b48652..a5fad14 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
-import com.google.common.collect.Collections2;
-
import net.nicoulaj.compilecommand.annotations.Inline;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.db.*;
@@ -118,40 +116,41 @@ public class UnfilteredSerializer
@Deprecated
private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the
row deletion is shadowable. If there is no extended flag (or no row deletion),
the deletion is assumed not shadowable.
- public void serialize(Unfiltered unfiltered, SerializationHeader header,
DataOutputPlus out, int version)
+ public void serialize(Unfiltered unfiltered, SerializationHelper helper,
DataOutputPlus out, int version)
throws IOException
{
- assert !header.isForSSTable();
- serialize(unfiltered, header, out, 0, version);
+ assert !helper.header.isForSSTable();
+ serialize(unfiltered, helper, out, 0, version);
}
- public void serialize(Unfiltered unfiltered, SerializationHeader header,
DataOutputPlus out, long previousUnfilteredSize, int version)
+ public void serialize(Unfiltered unfiltered, SerializationHelper helper,
DataOutputPlus out, long previousUnfilteredSize, int version)
throws IOException
{
if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
{
- serialize((RangeTombstoneMarker) unfiltered, header, out,
previousUnfilteredSize, version);
+ serialize((RangeTombstoneMarker) unfiltered, helper, out,
previousUnfilteredSize, version);
}
else
{
- serialize((Row) unfiltered, header, out, previousUnfilteredSize,
version);
+ serialize((Row) unfiltered, helper, out, previousUnfilteredSize,
version);
}
}
- public void serializeStaticRow(Row row, SerializationHeader header,
DataOutputPlus out, int version)
+ public void serializeStaticRow(Row row, SerializationHelper helper,
DataOutputPlus out, int version)
throws IOException
{
assert row.isStatic();
- serialize(row, header, out, 0, version);
+ serialize(row, helper, out, 0, version);
}
- private void serialize(Row row, SerializationHeader header, DataOutputPlus
out, long previousUnfilteredSize, int version)
+ private void serialize(Row row, SerializationHelper helper, DataOutputPlus
out, long previousUnfilteredSize, int version)
throws IOException
{
int flags = 0;
int extendedFlags = 0;
boolean isStatic = row.isStatic();
+ SerializationHeader header = helper.header;
Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
@@ -191,7 +190,7 @@ public class UnfilteredSerializer
{
try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
{
- serializeRowBody(row, flags, header, dob);
+ serializeRowBody(row, flags, helper, dob);
out.writeUnsignedVInt(dob.position() +
TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize));
// We write the size of the previous unfiltered to make
reverse queries more efficient (and simpler).
@@ -202,16 +201,17 @@ public class UnfilteredSerializer
}
else
{
- serializeRowBody(row, flags, header, out);
+ serializeRowBody(row, flags, helper, out);
}
}
@Inline
- private void serializeRowBody(Row row, int flags, SerializationHeader
header, DataOutputPlus out)
+ private void serializeRowBody(Row row, int flags, SerializationHelper
helper, DataOutputPlus out)
throws IOException
{
boolean isStatic = row.isStatic();
+ SerializationHeader header = helper.header;
Columns headerColumns = header.columns(isStatic);
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
@@ -229,7 +229,7 @@ public class UnfilteredSerializer
if ((flags & HAS_ALL_COLUMNS) == 0)
Columns.serializer.serializeSubset(row.columns(), headerColumns,
out);
- SearchIterator<ColumnMetadata, ColumnMetadata> si =
headerColumns.iterator();
+ SearchIterator<ColumnMetadata, ColumnMetadata> si =
helper.iterator(isStatic);
try
{
@@ -253,7 +253,7 @@ public class UnfilteredSerializer
{
throw new WrappedException(e);
}
- }, false);
+ });
}
catch (WrappedException e)
{
@@ -275,9 +275,10 @@ public class UnfilteredSerializer
Cell.serializer.serialize(cell, column, out, rowLiveness, header);
}
- private void serialize(RangeTombstoneMarker marker, SerializationHeader
header, DataOutputPlus out, long previousUnfilteredSize, int version)
+ private void serialize(RangeTombstoneMarker marker, SerializationHelper
helper, DataOutputPlus out, long previousUnfilteredSize, int version)
throws IOException
{
+ SerializationHeader header = helper.header;
out.writeByte((byte)IS_MARKER);
ClusteringBoundOrBoundary.serializer.serialize(marker.clustering(),
out, version, header.clusteringTypes());
@@ -299,20 +300,20 @@ public class UnfilteredSerializer
}
}
- public long serializedSize(Unfiltered unfiltered, SerializationHeader
header, int version)
+ public long serializedSize(Unfiltered unfiltered, SerializationHelper
helper, int version)
{
- assert !header.isForSSTable();
- return serializedSize(unfiltered, header, 0, version);
+ assert !helper.header.isForSSTable();
+ return serializedSize(unfiltered, helper, 0, version);
}
- public long serializedSize(Unfiltered unfiltered, SerializationHeader
header, long previousUnfilteredSize,int version)
+ public long serializedSize(Unfiltered unfiltered, SerializationHelper
helper, long previousUnfilteredSize,int version)
{
return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
- ? serializedSize((RangeTombstoneMarker) unfiltered, header,
previousUnfilteredSize, version)
- : serializedSize((Row) unfiltered, header,
previousUnfilteredSize, version);
+ ? serializedSize((RangeTombstoneMarker) unfiltered, helper,
previousUnfilteredSize, version)
+ : serializedSize((Row) unfiltered, helper,
previousUnfilteredSize, version);
}
- private long serializedSize(Row row, SerializationHeader header, long
previousUnfilteredSize, int version)
+ private long serializedSize(Row row, SerializationHelper helper, long
previousUnfilteredSize, int version)
{
long size = 1; // flags
@@ -320,15 +321,16 @@ public class UnfilteredSerializer
size += 1; // extended flags
if (!row.isStatic())
- size += Clustering.serializer.serializedSize(row.clustering(),
version, header.clusteringTypes());
+ size += Clustering.serializer.serializedSize(row.clustering(),
version, helper.header.clusteringTypes());
- return size + serializedRowBodySize(row, header,
previousUnfilteredSize, version);
+ return size + serializedRowBodySize(row, helper,
previousUnfilteredSize, version);
}
- private long serializedRowBodySize(Row row, SerializationHeader header,
long previousUnfilteredSize, int version)
+ private long serializedRowBodySize(Row row, SerializationHelper helper,
long previousUnfilteredSize, int version)
{
long size = 0;
+ SerializationHeader header = helper.header;
if (header.isForSSTable())
size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
@@ -352,19 +354,16 @@ public class UnfilteredSerializer
if (!hasAllColumns)
size += Columns.serializer.serializedSubsetSize(row.columns(),
header.columns(isStatic));
- SearchIterator<ColumnMetadata, ColumnMetadata> si =
headerColumns.iterator();
- for (ColumnData data : row)
- {
+ SearchIterator<ColumnMetadata, ColumnMetadata> si =
helper.iterator(isStatic);
+ return row.accumulate((data, v) -> {
ColumnMetadata column = si.next(data.column());
assert column != null;
if (data.column.isSimple())
- size += Cell.serializer.serializedSize((Cell) data, column,
pkLiveness, header);
+ return v + Cell.serializer.serializedSize((Cell) data, column,
pkLiveness, header);
else
- size += sizeOfComplexColumn((ComplexColumnData) data, column,
hasComplexDeletion, pkLiveness, header);
- }
-
- return size;
+ return v + sizeOfComplexColumn((ComplexColumnData) data,
column, hasComplexDeletion, pkLiveness, header);
+ }, size);
}
private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata
column, boolean hasComplexDeletion, LivenessInfo rowLiveness,
SerializationHeader header)
@@ -381,12 +380,12 @@ public class UnfilteredSerializer
return size;
}
- private long serializedSize(RangeTombstoneMarker marker,
SerializationHeader header, long previousUnfilteredSize, int version)
+ private long serializedSize(RangeTombstoneMarker marker,
SerializationHelper helper, long previousUnfilteredSize, int version)
{
- assert !header.isForSSTable();
+ assert !helper.header.isForSSTable();
return 1 // flags
- +
ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(),
version, header.clusteringTypes())
- + serializedMarkerBodySize(marker, header,
previousUnfilteredSize, version);
+ +
ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(),
version, helper.header.clusteringTypes())
+ + serializedMarkerBodySize(marker, helper.header,
previousUnfilteredSize, version);
}
private long serializedMarkerBodySize(RangeTombstoneMarker marker,
SerializationHeader header, long previousUnfilteredSize, int version)
@@ -428,7 +427,7 @@ public class UnfilteredSerializer
* @return the deserialized {@link Unfiltered} or {@code null} if we've
read the end of a partition. This method is
* guaranteed to never return empty rows.
*/
- public Unfiltered deserialize(DataInputPlus in, SerializationHeader
header, SerializationHelper helper, Row.Builder builder)
+ public Unfiltered deserialize(DataInputPlus in, SerializationHeader
header, DeserializationHelper helper, Row.Builder builder)
throws IOException
{
while (true)
@@ -453,7 +452,7 @@ public class UnfilteredSerializer
* But as {@link UnfilteredRowIterator} should not return empty
* rows, this mean consumer of this method should make sure to skip said
empty rows.
*/
- private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader
header, SerializationHelper helper, Row.Builder builder)
+ private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader
header, DeserializationHelper helper, Row.Builder builder)
throws IOException
{
// It wouldn't be wrong per-se to use an unsorted builder, but it
would be inefficient so make sure we don't do it by mistake
@@ -481,7 +480,7 @@ public class UnfilteredSerializer
}
}
- public Unfiltered deserializeTombstonesOnly(FileDataInput in,
SerializationHeader header, SerializationHelper helper)
+ public Unfiltered deserializeTombstonesOnly(FileDataInput in,
SerializationHeader header, DeserializationHelper helper)
throws IOException
{
while (true)
@@ -533,7 +532,7 @@ public class UnfilteredSerializer
}
}
- public Row deserializeStaticRow(DataInputPlus in, SerializationHeader
header, SerializationHelper helper)
+ public Row deserializeStaticRow(DataInputPlus in, SerializationHeader
header, DeserializationHelper helper)
throws IOException
{
int flags = in.readUnsignedByte();
@@ -561,7 +560,7 @@ public class UnfilteredSerializer
public Row deserializeRowBody(DataInputPlus in,
SerializationHeader header,
- SerializationHelper helper,
+ DeserializationHelper helper,
int flags,
int extendedFlags,
Row.Builder builder)
@@ -614,7 +613,7 @@ public class UnfilteredSerializer
{
throw new WrappedException(e);
}
- }, false);
+ });
}
catch (WrappedException e)
{
@@ -636,7 +635,7 @@ public class UnfilteredSerializer
}
}
- private void readSimpleColumn(ColumnMetadata column, DataInputPlus in,
SerializationHeader header, SerializationHelper helper, Row.Builder builder,
LivenessInfo rowLiveness)
+ private void readSimpleColumn(ColumnMetadata column, DataInputPlus in,
SerializationHeader header, DeserializationHelper helper, Row.Builder builder,
LivenessInfo rowLiveness)
throws IOException
{
if (helper.includes(column))
@@ -651,7 +650,7 @@ public class UnfilteredSerializer
}
}
- private void readComplexColumn(ColumnMetadata column, DataInputPlus in,
SerializationHeader header, SerializationHelper helper, boolean
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
+ private void readComplexColumn(ColumnMetadata column, DataInputPlus in,
SerializationHeader header, DeserializationHelper helper, boolean
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
throws IOException
{
if (helper.includes(column))
@@ -686,7 +685,7 @@ public class UnfilteredSerializer
in.skipBytesFully(rowSize);
}
- public void skipStaticRow(DataInputPlus in, SerializationHeader header,
SerializationHelper helper) throws IOException
+ public void skipStaticRow(DataInputPlus in, SerializationHeader header,
DeserializationHelper helper) throws IOException
{
int flags = in.readUnsignedByte();
assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW
&& isExtended(flags) : "Flags is " + flags;
diff --git
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 190f136..686d874 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -181,7 +181,7 @@ public class CassandraStreamReader implements IStreamReader
private final TableMetadata metadata;
private final DataInputPlus in;
private final SerializationHeader header;
- private final SerializationHelper helper;
+ private final DeserializationHelper helper;
private DecoratedKey key;
private DeletionTime partitionLevelDeletion;
@@ -193,7 +193,7 @@ public class CassandraStreamReader implements IStreamReader
{
this.metadata = metadata;
this.in = in;
- this.helper = new SerializationHelper(metadata,
version.correspondingMessagingVersion(),
SerializationHelper.Flag.PRESERVE_SIZE);
+ this.helper = new DeserializationHelper(metadata,
version.correspondingMessagingVersion(),
DeserializationHelper.Flag.PRESERVE_SIZE);
this.header = header;
}
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 1846aa5..76e12c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -57,7 +57,7 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
DeletionTime partitionLevelDeletion =
DeletionTime.serializer.deserialize(file);
if (!partitionLevelDeletion.validate())
UnfilteredValidation.handleInvalid(sstable.metadata(), key,
sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
- SerializationHelper helper = new
SerializationHelper(sstable.metadata(),
sstable.descriptor.version.correspondingMessagingVersion(),
SerializationHelper.Flag.LOCAL);
+ DeserializationHelper helper = new
DeserializationHelper(sstable.metadata(),
sstable.descriptor.version.correspondingMessagingVersion(),
DeserializationHelper.Flag.LOCAL);
SSTableSimpleIterator iterator =
SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper,
partitionLevelDeletion);
return new SSTableIdentityIterator(sstable, key,
partitionLevelDeletion, file.getPath(), iterator);
}
@@ -76,7 +76,7 @@ public class SSTableIdentityIterator implements
Comparable<SSTableIdentityIterat
dfile.seek(indexEntry.position);
ByteBufferUtil.skipShortLength(dfile); // Skip partition key
DeletionTime partitionLevelDeletion =
DeletionTime.serializer.deserialize(dfile);
- SerializationHelper helper = new
SerializationHelper(sstable.metadata(),
sstable.descriptor.version.correspondingMessagingVersion(),
SerializationHelper.Flag.LOCAL);
+ DeserializationHelper helper = new
DeserializationHelper(sstable.metadata(),
sstable.descriptor.version.correspondingMessagingVersion(),
DeserializationHelper.Flag.LOCAL);
SSTableSimpleIterator iterator = tombstoneOnly
?
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile,
sstable.header, helper, partitionLevelDeletion)
: SSTableSimpleIterator.create(sstable.metadata(), dfile,
sstable.header, helper, partitionLevelDeletion);
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index c3c7472..fd1b6a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -38,21 +38,21 @@ public abstract class SSTableSimpleIterator extends
AbstractIterator<Unfiltered>
{
final TableMetadata metadata;
protected final DataInputPlus in;
- protected final SerializationHelper helper;
+ protected final DeserializationHelper helper;
- private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in,
SerializationHelper helper)
+ private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in,
DeserializationHelper helper)
{
this.metadata = metadata;
this.in = in;
this.helper = helper;
}
- public static SSTableSimpleIterator create(TableMetadata metadata,
DataInputPlus in, SerializationHeader header, SerializationHelper helper,
DeletionTime partitionDeletion)
+ public static SSTableSimpleIterator create(TableMetadata metadata,
DataInputPlus in, SerializationHeader header, DeserializationHelper helper,
DeletionTime partitionDeletion)
{
return new CurrentFormatIterator(metadata, in, header, helper);
}
- public static SSTableSimpleIterator createTombstoneOnly(TableMetadata
metadata, DataInputPlus in, SerializationHeader header, SerializationHelper
helper, DeletionTime partitionDeletion)
+ public static SSTableSimpleIterator createTombstoneOnly(TableMetadata
metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper
helper, DeletionTime partitionDeletion)
{
return new CurrentFormatTombstoneIterator(metadata, in, header,
helper);
}
@@ -65,7 +65,7 @@ public abstract class SSTableSimpleIterator extends
AbstractIterator<Unfiltered>
private final Row.Builder builder;
- private CurrentFormatIterator(TableMetadata metadata, DataInputPlus
in, SerializationHeader header, SerializationHelper helper)
+ private CurrentFormatIterator(TableMetadata metadata, DataInputPlus
in, SerializationHeader header, DeserializationHelper helper)
{
super(metadata, in, helper);
this.header = header;
@@ -95,7 +95,7 @@ public abstract class SSTableSimpleIterator extends
AbstractIterator<Unfiltered>
{
private final SerializationHeader header;
- private CurrentFormatTombstoneIterator(TableMetadata metadata,
DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+ private CurrentFormatTombstoneIterator(TableMetadata metadata,
DataInputPlus in, SerializationHeader header, DeserializationHelper helper)
{
super(metadata, in, helper);
this.header = header;
diff --git
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 369be12..7ac2ebc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -31,6 +31,7 @@ import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.schema.TableMetadataRef;
@@ -56,6 +57,7 @@ class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
// Used to compute the row serialized size
private final SerializationHeader header;
+ private final SerializationHelper helper;
private final BlockingQueue<Buffer> writeQueue = new
SynchronousQueue<Buffer>();
private final DiskWriter diskWriter = new DiskWriter();
@@ -65,6 +67,7 @@ class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
super(directory, metadata, columns);
this.bufferSize = bufferSizeInMB * 1024L * 1024L;
this.header = new SerializationHeader(true, metadata.get(), columns,
EncodingStats.NO_STATS);
+ this.helper = new SerializationHelper(this.header);
diskWriter.start();
}
@@ -90,7 +93,7 @@ class SSTableSimpleUnsortedWriter extends
AbstractSSTableSimpleWriter
// improve that. In particular, what we count is closer to the
serialized value, but it's debatable that it's the right thing
// to count since it will take a lot more space in memory and the
bufferSize if first and foremost used to avoid OOM when
// using this writer.
- currentSize += UnfilteredSerializer.serializer.serializedSize(row,
header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
+ currentSize += UnfilteredSerializer.serializer.serializedSize(row,
helper, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
}
private void maybeSync() throws SyncException
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 422eaa8..ed0eb9b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -117,7 +117,7 @@ public class Commit
public Commit deserialize(DataInputPlus in, int version) throws
IOException
{
UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
- PartitionUpdate update =
PartitionUpdate.serializer.deserialize(in, version,
SerializationHelper.Flag.LOCAL);
+ PartitionUpdate update =
PartitionUpdate.serializer.deserialize(in, version,
DeserializationHelper.Flag.LOCAL);
return new Commit(ballot, update);
}
diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java
b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java
similarity index 67%
copy from src/java/org/apache/cassandra/utils/WrappedInt.java
copy to src/java/org/apache/cassandra/utils/BiLongAccumulator.java
index a106575..2c3d6b5 100644
--- a/src/java/org/apache/cassandra/utils/WrappedInt.java
+++ b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java
@@ -18,35 +18,7 @@
package org.apache.cassandra.utils;
-/**
- * Simple wrapper for native int type
- */
-public class WrappedInt
+public interface BiLongAccumulator<T, A>
{
- private int value;
-
- public WrappedInt(int initial)
- {
- this.value = initial;
- }
-
- public int get()
- {
- return value;
- }
-
- public void set(int value)
- {
- this.value = value;
- }
-
- public void increment()
- {
- ++value;
- }
-
- public void decrement()
- {
- --value;
- }
+ long apply(T obj, A arguemnt, long v);
}
diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java
b/src/java/org/apache/cassandra/utils/LongAccumulator.java
similarity index 67%
rename from src/java/org/apache/cassandra/utils/WrappedInt.java
rename to src/java/org/apache/cassandra/utils/LongAccumulator.java
index a106575..fe3c195 100644
--- a/src/java/org/apache/cassandra/utils/WrappedInt.java
+++ b/src/java/org/apache/cassandra/utils/LongAccumulator.java
@@ -18,35 +18,7 @@
package org.apache.cassandra.utils;
-/**
- * Simple wrapper for native int type
- */
-public class WrappedInt
+public interface LongAccumulator<T>
{
- private int value;
-
- public WrappedInt(int initial)
- {
- this.value = initial;
- }
-
- public int get()
- {
- return value;
- }
-
- public void set(int value)
- {
- this.value = value;
- }
-
- public void increment()
- {
- ++value;
- }
-
- public void decrement()
- {
- --value;
- }
+ long apply(T obj, long v);
}
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 97e935e..6c546ac 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -19,14 +19,18 @@
package org.apache.cassandra.utils.btree;
import java.util.*;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Ordering;
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.ObjectSizes;
import static com.google.common.collect.Iterables.concat;
@@ -691,7 +695,7 @@ public class BTree
}
// returns true if the provided node is a leaf, false if it is a branch
- static boolean isLeaf(Object[] node)
+ public static boolean isLeaf(Object[] node)
{
return (node.length & 1) == 1;
}
@@ -1285,36 +1289,46 @@ public class BTree
return compare(cmp, previous, max) < 0;
}
- /**
- * Simple method to walk the btree forwards or reversed and apply a
function to each element
- *
- * Public method
- *
- */
- public static <V> void apply(Object[] btree, Consumer<V> function, boolean
reversed)
+ private static <V, A> void applyValue(V value, BiConsumer<A, V> function,
A argument)
{
- if (reversed)
- applyReverse(btree, function, null);
- else
- applyForwards(btree, function, null);
+ function.accept(argument, value);
+ }
+
+ public static <V, A> void applyLeaf(Object[] btree, BiConsumer<A, V>
function, A argument)
+ {
+ Preconditions.checkArgument(isLeaf(btree));
+ int limit = getLeafKeyEnd(btree);
+ for (int i=0; i<limit; i++)
+ applyValue((V) btree[i], function, argument);
}
/**
- * Simple method to walk the btree forwards or reversed and apply a
function till a stop condition is reached
+ * Simple method to walk the btree forwards and apply a function till a
stop condition is reached
*
- * Public method
+ * Private method
*
+ * @param btree
+ * @param function
*/
- public static <V> void apply(Object[] btree, Consumer<V> function,
Predicate<V> stopCondition, boolean reversed)
+ public static <V, A> void apply(Object[] btree, BiConsumer<A, V> function,
A argument)
{
- if (reversed)
- applyReverse(btree, function, stopCondition);
- else
- applyForwards(btree, function, stopCondition);
- }
+ if (isLeaf(btree))
+ {
+ applyLeaf(btree, function, argument);
+ return;
+ }
+ int childOffset = getChildStart(btree);
+ int limit = btree.length - 1 - childOffset;
+ for (int i = 0 ; i < limit ; i++)
+ {
+ apply((Object[]) btree[childOffset + i], function, argument);
+ if (i < childOffset)
+ applyValue((V) btree[i], function, argument);
+ }
+ }
/**
* Simple method to walk the btree forwards and apply a function till a
stop condition is reached
@@ -1323,89 +1337,115 @@ public class BTree
*
* @param btree
* @param function
- * @param stopCondition
*/
- private static <V> boolean applyForwards(Object[] btree, Consumer<V>
function, Predicate<V> stopCondition)
+ public static <V> void apply(Object[] btree, Consumer<V> function)
{
- boolean isLeaf = isLeaf(btree);
- int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree);
- int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
- for (int i = 0 ; i < limit ; i++)
- {
- // we want to visit in iteration order, so we visit our key nodes
inbetween our children
- int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0);
- Object current = btree[idx];
- if (idx < childOffset)
- {
- V castedCurrent = (V) current;
- if (stopCondition != null &&
stopCondition.apply(castedCurrent))
- return true;
+ BTree.<V, Consumer<V>>apply(btree, Consumer::accept, function);
+ }
- function.accept(castedCurrent);
- }
- else
- {
- if (applyForwards((Object[]) current, function, stopCondition))
- return true;
- }
+ private static <V> int find(Object[] btree, V from, Comparator<V>
comparator)
+ {
+ // find the start index in iteration order
+ Preconditions.checkNotNull(comparator);
+ int keyEnd = getKeyEnd(btree);
+ return Arrays.binarySearch((V[]) btree, 0, keyEnd, from, comparator);
+ }
+
+ private static boolean isStopSentinel(long v)
+ {
+ return v == Long.MAX_VALUE;
+ }
+
+ private static <V, A> long accumulateLeaf(Object[] btree,
BiLongAccumulator<A, V> accumulator, A arg, Comparator<V> comparator, V from,
long initialValue)
+ {
+ Preconditions.checkArgument(isLeaf(btree));
+ long value = initialValue;
+ int limit = getLeafKeyEnd(btree);
+
+ int startIdx = 0;
+ if (from != null)
+ {
+ int i = find(btree, from, comparator);
+ boolean isExact = i >= 0;
+ startIdx = isExact ? i : (-1 - i);
}
- return false;
+ for (int i = startIdx; i < limit; i++)
+ {
+ value = accumulator.apply(arg, (V) btree[i], value);
+
+ if (isStopSentinel(value))
+ break;
+ }
+ return value;
}
/**
- * Simple method to walk the btree in reverse and apply a function till a
stop condition is reached
+ * Walk the btree and accumulate a long value using the supplied
accumulator function. Iteration will stop if the
+ * accumulator function returns the sentinel values Long.MIN_VALUE or
Long.MAX_VALUE
*
- * Private method
- *
- * @param btree
- * @param function
- * @param stopCondition
+ * If the optional from argument is not null, iteration will start from
that value (or the one after it's insertion
+ * point if an exact match isn't found)
*/
- private static <V> boolean applyReverse(Object[] btree, Consumer<V>
function, Predicate<V> stopCondition)
+ public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A,
V> accumulator, A arg, Comparator<V> comparator, V from, long initialValue)
{
- boolean isLeaf = isLeaf(btree);
- int childOffset = isLeaf ? 0 : getChildStart(btree);
- int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
- for (int i = limit - 1, visited = 0; i >= 0 ; i--, visited++)
+ if (isLeaf(btree))
+ return accumulateLeaf(btree, accumulator, arg, comparator, from,
initialValue);
+
+ long value = initialValue;
+ int childOffset = getChildStart(btree);
+
+ int startChild = 0;
+ if (from != null)
{
- int idx = i;
+ int i = find(btree, from, comparator);
+ boolean isExact = i >= 0;
- // we want to visit in reverse iteration order, so we visit our
children nodes inbetween our keys
- if (!isLeaf)
- {
- int typeOffset = visited / 2;
+ startChild = isExact ? i + 1 : -1 - i;
- if (i % 2 == 0)
- {
- // This is a child branch. Since children are in the
second half of the array, we must
- // adjust for the key's we've visited along the way
- idx += typeOffset;
- }
- else
- {
- // This is a key. Since the keys are in the first half of
the array and we are iterating
- // in reverse we subtract the childOffset and adjust for
children we've walked so far
- idx = i - childOffset + typeOffset;
- }
+ if (isExact)
+ {
+ value = accumulator.apply(arg, (V) btree[i], value);
+ if (isStopSentinel(value))
+ return value;
+ from = null;
}
+ }
- Object current = btree[idx];
- if (isLeaf || idx < childOffset)
- {
- V castedCurrent = (V) current;
- if (stopCondition != null &&
stopCondition.apply(castedCurrent))
- return true;
+ int limit = btree.length - 1 - childOffset;
+ for (int i=startChild; i<limit; i++)
+ {
+ value = accumulate((Object[]) btree[childOffset + i], accumulator,
arg, comparator, from, value);
- function.accept(castedCurrent);
- }
- else
+ if (isStopSentinel(value))
+ break;
+
+ if (i < childOffset)
{
- if (applyReverse((Object[]) current, function, stopCondition))
- return true;
+ value = accumulator.apply(arg, (V) btree[i], value);
+ // stop if a sentinel stop value was returned
+ if (isStopSentinel(value))
+ break;
}
+
+ if (from != null)
+ from = null;
}
+ return value;
+ }
+
+ public static <V> long accumulate(Object[] btree, LongAccumulator<V>
accumulator, Comparator<V> comparator, V from, long initialValue)
+ {
+ return accumulate(btree, LongAccumulator::apply, accumulator,
comparator, from, initialValue);
+ }
- return false;
+ public static <V> long accumulate(Object[] btree, LongAccumulator<V>
accumulator, long initialValue)
+ {
+ return accumulate(btree, accumulator, null, null, initialValue);
+ }
+
+ public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A,
V> accumulator, A arg, long initialValue)
+ {
+ return accumulate(btree, accumulator, arg, null, null, initialValue);
}
}
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index 2fcece6..2ad7f40 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -25,4 +25,8 @@ import org.apache.cassandra.utils.IndexedSearchIterator;
public interface BTreeSearchIterator<K, V> extends IndexedSearchIterator<K,
V>, Iterator<V>
{
+ /**
+ * Reset this Iterator to its starting position
+ */
+ public void rewind();
}
diff --git
a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
index a6197f8..29aed4b 100644
--- a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
@@ -45,8 +45,13 @@ public class LeafBTreeSearchIterator<K, V> implements
BTreeSearchIterator<K, V>
this.comparator = comparator;
this.lowerBound = lowerBound;
this.upperBound = upperBound;
- this.nextPos = forwards ? lowerBound : upperBound;
- this.hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ rewind();
+ }
+
+ public void rewind()
+ {
+ nextPos = forwards ? lowerBound : upperBound;
+ hasNext = nextPos >= lowerBound && nextPos <= upperBound;
}
public V next()
@@ -73,12 +78,30 @@ public class LeafBTreeSearchIterator<K, V> implements
BTreeSearchIterator<K, V>
return Arrays.binarySearch(keys, lb, ub + 1, key, comparator);
}
+ private void updateHasNext()
+ {
+ hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ }
+
public V next(K key)
{
if (!hasNext)
return null;
V result = null;
+ // first check the current position in case of sequential access
+ if (comparator.compare(key, keys[nextPos]) == 0)
+ {
+ hasCurrent = true;
+ result = (V) keys[nextPos];
+ nextPos += forwards ? 1 : -1;
+ }
+ updateHasNext();
+
+ if (result != null || !hasNext)
+ return result;
+
+ // otherwise search against the remaining values
int find = searchNext(key);
if (find >= 0)
{
@@ -91,7 +114,7 @@ public class LeafBTreeSearchIterator<K, V> implements
BTreeSearchIterator<K, V>
nextPos = (forwards ? -1 : -2) - find;
hasCurrent = false;
}
- hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+ updateHasNext();
return result;
}
diff --git
a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index e2c6e33..7fadac4 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -35,15 +35,12 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import io.netty.util.concurrent.FastThreadLocalThread;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.UpdateBuilder;
-import org.apache.cassandra.config.Config.CommitLogSync;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.marshal.UTF8Type;
@@ -451,7 +448,7 @@ public abstract class CommitLogStressTest
{
mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(),
-
SerializationHelper.Flag.LOCAL);
+
DeserializationHelper.Flag.LOCAL);
}
catch (IOException e)
{
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 8709775..e0215b7 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
@@ -368,7 +369,7 @@ public class ReadCommandTest
MessagingService.current_version,
cfs.metadata(),
columnFilter,
-
SerializationHelper.Flag.LOCAL));
+
DeserializationHelper.Flag.LOCAL));
}
}
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index c52fd3d..392a1a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
import org.apache.cassandra.Util;
import org.apache.cassandra.cache.IMeasurableMemory;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
@@ -279,6 +280,7 @@ public class RowIndexEntryTest extends CQLTester
{
private final UnfilteredRowIterator iterator;
private final SequentialWriter writer;
+ private final SerializationHelper helper;
private final SerializationHeader header;
private final int version;
@@ -306,6 +308,7 @@ public class RowIndexEntryTest extends CQLTester
{
this.iterator = iterator;
this.writer = writer;
+ this.helper = new SerializationHelper(header);
this.header = header;
this.version = version;
this.observers = observers == null ? Collections.emptyList() :
observers;
@@ -317,7 +320,7 @@ public class RowIndexEntryTest extends CQLTester
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
if (header.hasStatic())
-
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(),
header, writer, version);
+
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(),
helper, writer, version);
}
public ColumnIndex build() throws IOException
@@ -358,7 +361,7 @@ public class RowIndexEntryTest extends CQLTester
startPosition = pos;
}
- UnfilteredSerializer.serializer.serialize(unfiltered, header,
writer, pos - previousRowStart, version);
+ UnfilteredSerializer.serializer.serialize(unfiltered, helper,
writer, pos - previousRowStart, version);
// notify observers about each new row
if (!observers.isEmpty())
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 18bc6e0..fa3295a 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.RebufferingInputStream;
@@ -62,7 +62,7 @@ public class CDCTestReplayer extends CommitLogReplayer
Mutation mutation;
try
{
- mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL);
if (mutation.trackedByCDC())
sawCDCMutation = true;
}
diff --git
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 9a22b04..5b87d68 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -26,7 +26,7 @@ import org.junit.Assert;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.RebufferingInputStream;
@@ -65,7 +65,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
Mutation mutation;
try
{
- mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+ mutation = Mutation.serializer.deserialize(bufIn,
desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL);
Assert.assertTrue(processor.apply(mutation));
}
catch (IOException e)
diff --git a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
index 7cc1291..e60fb64 100644
--- a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
@@ -88,14 +88,20 @@ public class BTreeTest
}
};
- private static List<Integer> seq(int count)
+ private static List<Integer> seq(int count, int interval)
{
List<Integer> r = new ArrayList<>();
for (int i = 0 ; i < count ; i++)
- r.add(i);
+ if (i % interval == 0)
+ r.add(i);
return r;
}
+ private static List<Integer> seq(int count)
+ {
+ return seq(count, 1);
+ }
+
private static List<Integer> rand(int count)
{
Random rand = ThreadLocalRandom.current();
@@ -133,27 +139,60 @@ public class BTreeTest
}
@Test
- public void testApplyForwards()
+ public void testApply()
{
List<Integer> input = seq(71);
Object[] btree = BTree.build(input, noOp);
final List<Integer> result = new ArrayList<>();
- BTree.<Integer>apply(btree, i -> result.add(i), false);
+ BTree.<Integer>apply(btree, i -> result.add(i));
org.junit.Assert.assertArrayEquals(input.toArray(),result.toArray());
}
@Test
- public void testApplyReverse()
+ public void inOrderAccumulation()
{
List<Integer> input = seq(71);
Object[] btree = BTree.build(input, noOp);
+ long result = BTree.<Integer>accumulate(btree, (o, l) -> {
+ Assert.assertEquals((long) o, l + 1);
+ return o;
+ }, -1);
+ Assert.assertEquals(result, 70);
+ }
- final List<Integer> result = new ArrayList<>();
- BTree.<Integer>apply(btree, i -> result.add(i), true);
+ @Test
+ public void accumulateFrom()
+ {
+ int limit = 100;
+ for (int interval=1; interval<=5; interval++)
+ {
+ List<Integer> input = seq(limit, interval);
+ Object[] btree = BTree.build(input, noOp);
+ for (int start=0; start<=limit; start+=interval)
+ {
+ int thisInterval = interval;
+ String errMsg = String.format("interval=%s, start=%s",
interval, start);
+ long result = BTree.accumulate(btree, (o, l) -> {
+ Assert.assertEquals(errMsg, (long) o, l + thisInterval);
+ return o;
+ }, Comparator.naturalOrder(), start, start - thisInterval);
+ Assert.assertEquals(errMsg, result,
(limit-1)/interval*interval);
+ }
+ }
+ }
-
org.junit.Assert.assertArrayEquals(Lists.reverse(input).toArray(),result.toArray());
+ /**
+ * accumulate function should not be called if we ask it to start past the
end of the btree
+ */
+ @Test
+ public void accumulateFromEnd()
+ {
+ List<Integer> input = seq(100);
+ Object[] btree = BTree.build(input, noOp);
+ long result = BTree.accumulate(btree, (o, l) -> 1, Integer::compareTo,
101, 0L);
+ Assert.assertEquals(0, result);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]