9472: Reintroduce Off-Heap Memtables patch by benedict and stefania
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2f412431 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2f412431 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2f412431 Branch: refs/heads/trunk Commit: 2f41243191c381193a3bf6ec3730ff6555325d06 Parents: a0901b8 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Wed Nov 4 17:03:38 2015 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Wed Jan 27 10:49:26 2016 +0000 ---------------------------------------------------------------------- conf/cassandra.yaml | 1 + .../cassandra/config/DatabaseDescriptor.java | 3 +- .../db/AbstractBufferClusteringPrefix.java | 72 ++++++ .../cassandra/db/AbstractClusteringPrefix.java | 44 ---- .../apache/cassandra/db/BufferClustering.java | 44 ++++ src/java/org/apache/cassandra/db/CBuilder.java | 6 +- .../org/apache/cassandra/db/Clustering.java | 150 ++++++------ .../cassandra/db/ClusteringComparator.java | 3 - .../apache/cassandra/db/ClusteringPrefix.java | 4 +- .../org/apache/cassandra/db/LegacyLayout.java | 8 +- src/java/org/apache/cassandra/db/Memtable.java | 5 +- .../org/apache/cassandra/db/MultiCBuilder.java | 2 +- .../apache/cassandra/db/NativeClustering.java | 125 ++++++++++ .../apache/cassandra/db/NativeDecoratedKey.java | 6 +- .../apache/cassandra/db/RowUpdateBuilder.java | 2 - .../org/apache/cassandra/db/Serializers.java | 6 +- .../db/SinglePartitionReadCommand.java | 18 +- src/java/org/apache/cassandra/db/Slice.java | 2 +- .../apache/cassandra/db/filter/RowFilter.java | 4 +- .../db/partitions/AbstractBTreePartition.java | 25 +- .../db/partitions/AtomicBTreePartition.java | 66 +++++- .../db/partitions/FilteredPartition.java | 2 +- .../apache/cassandra/db/rows/AbstractCell.java | 79 +++++++ .../org/apache/cassandra/db/rows/BTreeRow.java | 33 ++- .../apache/cassandra/db/rows/BufferCell.java | 232 ------------------- src/java/org/apache/cassandra/db/rows/Cell.java | 161 ++++++++++++- .../apache/cassandra/db/rows/NativeCell.java | 151 ++++++++++++ .../apache/cassandra/db/rows/RowIterator.java | 5 - .../db/rows/UnfilteredRowIterators.java | 27 --- .../apache/cassandra/db/transform/BaseRows.java | 4 + .../cassandra/db/transform/Transformation.java | 6 + .../apache/cassandra/db/view/TemporalRow.java | 11 +- .../cassandra/index/internal/IndexEntry.java | 1 + .../index/internal/keys/KeysSearcher.java | 4 +- .../io/sstable/metadata/MetadataCollector.java | 4 - .../cassandra/io/util/MemoryInputStream.java | 2 +- .../apache/cassandra/service/CacheService.java | 2 - .../cassandra/thrift/CassandraServer.java | 16 +- .../cassandra/thrift/ThriftResultsMerger.java | 2 +- .../utils/memory/AbstractAllocator.java | 1 - .../cassandra/utils/memory/EnsureOnHeap.java | 150 ++++++++++++ .../cassandra/utils/memory/HeapAllocator.java | 5 + .../apache/cassandra/utils/memory/HeapPool.java | 5 - .../cassandra/utils/memory/MemoryUtil.java | 19 +- .../utils/memory/MemtableAllocator.java | 5 +- .../cassandra/utils/memory/MemtablePool.java | 1 - .../cassandra/utils/memory/NativeAllocator.java | 46 +++- .../cassandra/utils/memory/NativePool.java | 6 - .../cassandra/utils/memory/SlabAllocator.java | 15 +- .../apache/cassandra/utils/memory/SlabPool.java | 5 - test/conf/cassandra.yaml | 3 +- .../org/apache/cassandra/db/KeyspaceTest.java | 4 +- .../org/apache/cassandra/db/NativeCellTest.java | 171 ++++++++++++++ .../cassandra/db/RangeTombstoneListTest.java | 2 +- .../apache/cassandra/db/RangeTombstoneTest.java | 20 +- .../apache/cassandra/db/ReadMessageTest.java | 2 +- .../org/apache/cassandra/db/RowCacheTest.java | 2 +- .../rows/DigestBackwardCompatibilityTest.java | 2 - .../rows/UnfilteredRowIteratorsMergeTest.java | 2 +- .../io/sstable/CQLSSTableWriterClientTest.java | 4 - .../cassandra/io/sstable/SSTableLoaderTest.java | 2 +- .../service/pager/PagingStateTest.java | 2 +- 62 files changed, 1265 insertions(+), 547 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index e29a6d3..a9749f2 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -398,6 +398,7 @@ concurrent_materialized_view_writes: 32 # Options are: # heap_buffers: on heap nio buffers # offheap_buffers: off heap (direct) nio buffers +# offheap_objects: off heap objects memtable_allocation_type: heap_buffers # Total space to use for commit logs on disk. http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2a2719a..b09605f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1847,8 +1847,7 @@ public class DatabaseDescriptor } return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); case offheap_objects: - throw new ConfigurationException("offheap_objects are not available in 3.0. They should be re-introduced in a future release, see https://issues.apache.org/jira/browse/CASSANDRA-9472 for details"); - // return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); + return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); default: throw new AssertionError(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java new file mode 100644 index 0000000..95bc777 --- /dev/null +++ b/src/java/org/apache/cassandra/db/AbstractBufferClusteringPrefix.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.utils.ObjectSizes; + +public abstract class AbstractBufferClusteringPrefix extends AbstractClusteringPrefix +{ + public static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0]; + private static final long EMPTY_SIZE = ObjectSizes.measure(Clustering.make(EMPTY_VALUES_ARRAY)); + + protected final Kind kind; + protected final ByteBuffer[] values; + + protected AbstractBufferClusteringPrefix(Kind kind, ByteBuffer[] values) + { + this.kind = kind; + this.values = values; + } + + public Kind kind() + { + return kind; + } + + public ClusteringPrefix clustering() + { + return this; + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + + public ByteBuffer[] getRawValues() + { + return values; + } + + public long unsharedHeapSize() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java index 2631b46..0b1daf7 100644 --- a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java @@ -22,48 +22,14 @@ import java.security.MessageDigest; import java.util.Objects; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.ObjectSizes; public abstract class AbstractClusteringPrefix implements ClusteringPrefix { - protected static final ByteBuffer[] EMPTY_VALUES_ARRAY = new ByteBuffer[0]; - - private static final long EMPTY_SIZE = ObjectSizes.measure(new Clustering(EMPTY_VALUES_ARRAY)); - - protected final Kind kind; - protected final ByteBuffer[] values; - - protected AbstractClusteringPrefix(Kind kind, ByteBuffer[] values) - { - this.kind = kind; - this.values = values; - } - - public Kind kind() - { - return kind; - } - public ClusteringPrefix clustering() { return this; } - public int size() - { - return values.length; - } - - public ByteBuffer get(int i) - { - return values[i]; - } - - public ByteBuffer[] getRawValues() - { - return values; - } - public int dataSize() { int size = 0; @@ -86,16 +52,6 @@ public abstract class AbstractClusteringPrefix implements ClusteringPrefix FBUtilities.updateWithByte(digest, kind().ordinal()); } - public long unsharedHeapSize() - { - return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); - } - - public long unsharedHeapSizeExcludingData() - { - return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); - } - @Override public final int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/BufferClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BufferClustering.java b/src/java/org/apache/cassandra/db/BufferClustering.java new file mode 100644 index 0000000..7c6bb20 --- /dev/null +++ b/src/java/org/apache/cassandra/db/BufferClustering.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * The clustering column values for a row. + * <p> + * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have + * as many values as there is clustering columns in the table it is part of. It is the clustering + * prefix used by rows. + * <p> + * Note however that while it's size must be equal to the table clustering size, a clustering can have + * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null, + * all of the following ones will be too because that's what thrift allows, but it's never assumed by the + * code so we could start generally allowing nulls for clustering columns if we wanted to). + */ +public class BufferClustering extends AbstractBufferClusteringPrefix implements Clustering +{ + BufferClustering(ByteBuffer... values) + { + super(Kind.CLUSTERING, values); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/CBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/CBuilder.java b/src/java/org/apache/cassandra/db/CBuilder.java index 94feb93..73b575f 100644 --- a/src/java/org/apache/cassandra/db/CBuilder.java +++ b/src/java/org/apache/cassandra/db/CBuilder.java @@ -162,7 +162,7 @@ public abstract class CBuilder built = true; // Currently, only dense table can leave some clustering column out (see #7990) - return size == 0 ? Clustering.EMPTY : new Clustering(values); + return size == 0 ? Clustering.EMPTY : Clustering.make(values); } public Slice.Bound buildBound(boolean isStart, boolean isInclusive) @@ -196,7 +196,7 @@ public abstract class CBuilder ByteBuffer[] newValues = Arrays.copyOf(values, type.size()); newValues[size] = value; - return new Clustering(newValues); + return Clustering.make(newValues); } public Clustering buildWith(List<ByteBuffer> newValues) @@ -207,7 +207,7 @@ public abstract class CBuilder for (ByteBuffer value : newValues) buffers[newSize++] = value; - return new Clustering(buffers); + return Clustering.make(buffers); } public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, boolean isInclusive) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Clustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Clustering.java b/src/java/org/apache/cassandra/db/Clustering.java index a40cc1f..f5ffae4 100644 --- a/src/java/org/apache/cassandra/db/Clustering.java +++ b/src/java/org/apache/cassandra/db/Clustering.java @@ -1,53 +1,91 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package org.apache.cassandra.db; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.io.util.*; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.memory.AbstractAllocator; -/** - * The clustering column values for a row. - * <p> - * A {@code Clustering} is a {@code ClusteringPrefix} that must always be "complete", i.e. have - * as many values as there is clustering columns in the table it is part of. It is the clustering - * prefix used by rows. - * <p> - * Note however that while it's size must be equal to the table clustering size, a clustering can have - * {@code null} values, and this mostly for thrift backward compatibility (in practice, if a value is null, - * all of the following ones will be too because that's what thrift allows, but it's never assumed by the - * code so we could start generally allowing nulls for clustering columns if we wanted to). - */ -public class Clustering extends AbstractClusteringPrefix +import static org.apache.cassandra.db.AbstractBufferClusteringPrefix.EMPTY_VALUES_ARRAY; + +public interface Clustering extends ClusteringPrefix { public static final Serializer serializer = new Serializer(); + public long unsharedHeapSizeExcludingData(); + + public default Clustering copy(AbstractAllocator allocator) + { + // Important for STATIC_CLUSTERING (but must copy empty native clustering types). + if (size() == 0) + return kind() == Kind.STATIC_CLUSTERING ? this : new BufferClustering(EMPTY_VALUES_ARRAY); + + ByteBuffer[] newValues = new ByteBuffer[size()]; + for (int i = 0; i < size(); i++) + { + ByteBuffer val = get(i); + newValues[i] = val == null ? null : allocator.clone(val); + } + return new BufferClustering(newValues); + } + + public default String toString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size(); i++) + { + ColumnDefinition c = metadata.clusteringColumns().get(i); + sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i))); + } + return sb.toString(); + } + + public default String toCQLString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < size(); i++) + { + ColumnDefinition c = metadata.clusteringColumns().get(i); + sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i))); + } + return sb.toString(); + } + + public static Clustering make(ByteBuffer... values) + { + return new BufferClustering(values); + } + /** * The special cased clustering used by all static rows. It is a special case in the * sense that it's always empty, no matter how many clustering columns the table has. */ - public static final Clustering STATIC_CLUSTERING = new Clustering(EMPTY_VALUES_ARRAY) + public static final Clustering STATIC_CLUSTERING = new BufferClustering(EMPTY_VALUES_ARRAY) { @Override public Kind kind() @@ -69,7 +107,7 @@ public class Clustering extends AbstractClusteringPrefix }; /** Empty clustering for tables having no clustering columns. */ - public static final Clustering EMPTY = new Clustering(EMPTY_VALUES_ARRAY) + public static final Clustering EMPTY = new BufferClustering(EMPTY_VALUES_ARRAY) { @Override public String toString(CFMetaData metadata) @@ -78,50 +116,6 @@ public class Clustering extends AbstractClusteringPrefix } }; - public Clustering(ByteBuffer... values) - { - super(Kind.CLUSTERING, values); - } - - public Kind kind() - { - return Kind.CLUSTERING; - } - - public Clustering copy(AbstractAllocator allocator) - { - // Important for STATIC_CLUSTERING (but no point in being wasteful in general). - if (size() == 0) - return this; - - ByteBuffer[] newValues = new ByteBuffer[size()]; - for (int i = 0; i < size(); i++) - newValues[i] = values[i] == null ? null : allocator.clone(values[i]); - return new Clustering(newValues); - } - - public String toString(CFMetaData metadata) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < size(); i++) - { - ColumnDefinition c = metadata.clusteringColumns().get(i); - sb.append(i == 0 ? "" : ", ").append(c.name).append('=').append(get(i) == null ? "null" : c.type.getString(get(i))); - } - return sb.toString(); - } - - public String toCQLString(CFMetaData metadata) - { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < size(); i++) - { - ColumnDefinition c = metadata.clusteringColumns().get(i); - sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i))); - } - return sb.toString(); - } - /** * Serializer for Clustering object. * <p> @@ -161,7 +155,7 @@ public class Clustering extends AbstractClusteringPrefix return EMPTY; ByteBuffer[] values = ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, types.size(), version, types); - return new Clustering(values); + return new BufferClustering(values); } public Clustering deserialize(ByteBuffer in, int version, List<AbstractType<?>> types) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringComparator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java b/src/java/org/apache/cassandra/db/ClusteringComparator.java index f3411cf..f5f6ae8 100644 --- a/src/java/org/apache/cassandra/db/ClusteringComparator.java +++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -29,8 +28,6 @@ import com.google.common.collect.ImmutableList; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FastByteOperations; import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 9477651..cacaeb5 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -74,7 +74,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable */ public final int comparedToClustering; - private Kind(int comparison, int comparedToClustering) + Kind(int comparison, int comparedToClustering) { this.comparison = comparison; this.comparedToClustering = comparedToClustering; @@ -500,7 +500,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable { assert nextIsRow; deserializeAll(); - Clustering clustering = new Clustering(nextValues); + Clustering clustering = Clustering.make(nextValues); nextValues = null; return clustering; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 07778b3..4e1eab5 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -103,7 +103,7 @@ public abstract class LegacyLayout if (metadata.isSuper()) { assert superColumnName != null; - return decodeForSuperColumn(metadata, new Clustering(superColumnName), cellname); + return decodeForSuperColumn(metadata, Clustering.make(superColumnName), cellname); } assert superColumnName == null; @@ -161,7 +161,7 @@ public abstract class LegacyLayout { // If it's a compact table, it means the column is in fact a "dynamic" one if (metadata.isCompactTable()) - return new LegacyCellName(new Clustering(column), metadata.compactValueColumn(), null); + return new LegacyCellName(Clustering.make(column), metadata.compactValueColumn(), null); if (def == null) throw new UnknownColumnException(metadata, column); @@ -298,7 +298,7 @@ public abstract class LegacyLayout ? CompositeType.splitName(value) : Collections.singletonList(value); - return new Clustering(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); + return Clustering.make(components.subList(0, Math.min(csize, components.size())).toArray(new ByteBuffer[csize])); } public static ByteBuffer encodeClustering(CFMetaData metadata, ClusteringPrefix clustering) @@ -1308,7 +1308,7 @@ public abstract class LegacyLayout ByteBuffer[] values = new ByteBuffer[bound.size()]; for (int i = 0; i < bound.size(); i++) values[i] = bound.get(i); - return new Clustering(values); + return Clustering.make(values); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 952c045..244c7b6 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -40,6 +40,7 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.index.transactions.UpdateTransaction; @@ -53,6 +54,7 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; import org.apache.cassandra.utils.memory.MemtableAllocator; import org.apache.cassandra.utils.memory.MemtablePool; @@ -60,7 +62,7 @@ public class Memtable implements Comparable<Memtable> { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool(); + public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool(); private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000"))); private final MemtableAllocator allocator; @@ -526,6 +528,7 @@ public class Memtable implements Comparable<Memtable> assert entry.getKey() instanceof DecoratedKey; DecoratedKey key = (DecoratedKey)entry.getKey(); ClusteringIndexFilter filter = dataRange.clusteringIndexFilter(key); + return filter.getUnfilteredRowIterator(columnFilter, entry.getValue()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/MultiCBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/MultiCBuilder.java b/src/java/org/apache/cassandra/db/MultiCBuilder.java index 8353703..7a4eef0 100644 --- a/src/java/org/apache/cassandra/db/MultiCBuilder.java +++ b/src/java/org/apache/cassandra/db/MultiCBuilder.java @@ -239,7 +239,7 @@ public abstract class MultiCBuilder if (hasMissingElements) return BTreeSet.empty(comparator); - return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : new Clustering(elements)); + return BTreeSet.of(comparator, size == 0 ? Clustering.EMPTY : Clustering.make(elements)); } public NavigableSet<Slice.Bound> buildBound(boolean isStart, boolean isInclusive) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeClustering.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeClustering.java b/src/java/org/apache/cassandra/db/NativeClustering.java new file mode 100644 index 0000000..1943b71 --- /dev/null +++ b/src/java/org/apache/cassandra/db/NativeClustering.java @@ -0,0 +1,125 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.MemoryUtil; +import org.apache.cassandra.utils.memory.NativeAllocator; + +public class NativeClustering extends AbstractClusteringPrefix implements Clustering +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new NativeClustering()); + + private final long peer; + + private NativeClustering() { peer = 0; } + + public NativeClustering(NativeAllocator allocator, OpOrder.Group writeOp, Clustering clustering) + { + int count = clustering.size(); + int metadataSize = (count * 2) + 4; + int dataSize = clustering.dataSize(); + int bitmapSize = ((count + 7) >>> 3); + + assert count < 64 << 10; + assert dataSize < 64 << 10; + + peer = allocator.allocate(metadataSize + dataSize + bitmapSize, writeOp); + long bitmapStart = peer + metadataSize; + MemoryUtil.setShort(peer, (short) count); + MemoryUtil.setShort(peer + (metadataSize - 2), (short) dataSize); // goes at the end of the other offsets + + MemoryUtil.setByte(bitmapStart, bitmapSize, (byte) 0); + long dataStart = peer + metadataSize + bitmapSize; + int dataOffset = 0; + for (int i = 0 ; i < count ; i++) + { + MemoryUtil.setShort(peer + 2 + i * 2, (short) dataOffset); + + ByteBuffer value = clustering.get(i); + if (value == null) + { + long boffset = bitmapStart + (i >>> 3); + int b = MemoryUtil.getByte(boffset); + b |= 1 << (i & 7); + MemoryUtil.setByte(boffset, (byte) b); + continue; + } + + assert value.order() == ByteOrder.BIG_ENDIAN; + + int size = value.remaining(); + MemoryUtil.setBytes(dataStart + dataOffset, value); + dataOffset += size; + } + } + + public Kind kind() + { + return Kind.CLUSTERING; + } + + public int size() + { + return MemoryUtil.getShort(peer); + } + + public ByteBuffer get(int i) + { + // offset at which we store the dataOffset + int size = size(); + if (i >= size) + throw new IndexOutOfBoundsException(); + + int metadataSize = (size * 2) + 4; + int bitmapSize = ((size + 7) >>> 3); + long bitmapStart = peer + metadataSize; + int b = MemoryUtil.getByte(bitmapStart + (i >>> 3)); + if ((b & (1 << (i & 7))) != 0) + return null; + + int startOffset = MemoryUtil.getShort(peer + 2 + i * 2); + int endOffset = MemoryUtil.getShort(peer + 4 + i * 2); + return MemoryUtil.getByteBuffer(bitmapStart + bitmapSize + startOffset, + endOffset - startOffset, + ByteOrder.BIG_ENDIAN); + } + + public ByteBuffer[] getRawValues() + { + ByteBuffer[] values = new ByteBuffer[size()]; + for (int i = 0 ; i < values.length ; i++) + values[i] = get(i); + return values; + } + + public long unsharedHeapSize() + { + return EMPTY_SIZE; + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/NativeDecoratedKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java index ca874c3..019209e 100644 --- a/src/java/org/apache/cassandra/db/NativeDecoratedKey.java +++ b/src/java/org/apache/cassandra/db/NativeDecoratedKey.java @@ -18,9 +18,11 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.cassandra.dht.Token; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; import org.apache.cassandra.utils.memory.MemoryUtil; import org.apache.cassandra.utils.memory.NativeAllocator; @@ -32,6 +34,8 @@ public class NativeDecoratedKey extends DecoratedKey { super(token); assert key != null; + assert key.order() == ByteOrder.BIG_ENDIAN; + int size = key.remaining(); this.peer = allocator.allocate(4 + size, writeOp); MemoryUtil.setInt(peer, size); @@ -40,6 +44,6 @@ public class NativeDecoratedKey extends DecoratedKey public ByteBuffer getKey() { - return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer)); + return MemoryUtil.getByteBuffer(peer + 4, MemoryUtil.getInt(peer), ByteOrder.BIG_ENDIAN); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index 8ace988..0ceec90 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -18,7 +18,6 @@ package org.apache.cassandra.db; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,7 +26,6 @@ import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.marshal.SetType; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.partitions.*; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Serializers.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Serializers.java b/src/java/org/apache/cassandra/db/Serializers.java index 9b29d89..17f1de0 100644 --- a/src/java/org/apache/cassandra/db/Serializers.java +++ b/src/java/org/apache/cassandra/db/Serializers.java @@ -29,8 +29,6 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.utils.ByteBufferUtil; -import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo; - /** * Holds references on serializers that depend on the table definition. */ @@ -70,7 +68,7 @@ public class Serializers return Clustering.EMPTY; if (!metadata.isCompound()) - return new Clustering(bb); + return Clustering.make(bb); List<ByteBuffer> components = CompositeType.splitName(bb); byte eoc = CompositeType.lastEOC(bb); @@ -81,7 +79,7 @@ public class Serializers if (components.size() > clusteringSize) components = components.subList(0, clusteringSize); - return new Clustering(components.toArray(new ByteBuffer[clusteringSize])); + return Clustering.make(components.toArray(new ByteBuffer[clusteringSize])); } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 9ad9ba3..680b4b5 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -460,8 +460,7 @@ public class SinglePartitionReadCommand extends ReadCommand { Tracing.trace("Executing single-partition query on {}", cfs.name); - boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap(); - return queryMemtableAndDiskInternal(cfs, copyOnHeap); + return queryMemtableAndDiskInternal(cfs); } @Override @@ -470,7 +469,7 @@ public class SinglePartitionReadCommand extends ReadCommand return oldestUnrepairedTombstone; } - private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap) + private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs) { /* * We have 2 main strategies: @@ -484,7 +483,7 @@ public class SinglePartitionReadCommand extends ReadCommand * of shards so have the same problem). */ if (clusteringIndexFilter() instanceof ClusteringIndexNamesFilter && queryNeitherCountersNorCollections()) - return queryMemtableAndSSTablesInTimestampOrder(cfs, copyOnHeap, (ClusteringIndexNamesFilter)clusteringIndexFilter()); + return queryMemtableAndSSTablesInTimestampOrder(cfs, (ClusteringIndexNamesFilter)clusteringIndexFilter()); Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); @@ -502,10 +501,8 @@ public class SinglePartitionReadCommand extends ReadCommand @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); - @SuppressWarnings("resource") // same as above - UnfilteredRowIterator maybeCopied = copyOnHeap ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) : iter; oldestUnrepairedTombstone = Math.min(oldestUnrepairedTombstone, partition.stats().minLocalDeletionTime); - iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(maybeCopied, nowInSec()) : maybeCopied); + iterators.add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter); } /* * We can't eliminate full sstables based on the timestamp of what we've already read like @@ -649,7 +646,7 @@ public class SinglePartitionReadCommand extends ReadCommand * no collection or counters are included). * This method assumes the filter is a {@code ClusteringIndexNamesFilter}. */ - private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, boolean copyOnHeap, ClusteringIndexNamesFilter filter) + private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFamilyStore cfs, ClusteringIndexNamesFilter filter) { Tracing.trace("Acquiring sstable references"); ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); @@ -668,10 +665,7 @@ public class SinglePartitionReadCommand extends ReadCommand if (iter.isEmpty()) continue; - UnfilteredRowIterator clonedFilter = copyOnHeap - ? UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance) - : iter; - result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result, filter, false); + result = add(isForThrift() ? ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result, filter, false); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/Slice.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Slice.java b/src/java/org/apache/cassandra/db/Slice.java index 7fde45e..8611470 100644 --- a/src/java/org/apache/cassandra/db/Slice.java +++ b/src/java/org/apache/cassandra/db/Slice.java @@ -343,7 +343,7 @@ public class Slice * <p> * This can be either a start or an end bound, and this can be either inclusive or exclusive. */ - public static class Bound extends AbstractClusteringPrefix + public static class Bound extends AbstractBufferClusteringPrefix { public static final Serializer serializer = new Serializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/filter/RowFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index c234fc9..79bdbd7 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -194,11 +194,11 @@ public abstract class RowFilter implements Iterable<RowFilter.Expression> if (metadata.isCompound()) { List<ByteBuffer> values = CompositeType.splitName(name); - return new Clustering(values.toArray(new ByteBuffer[metadata.comparator.size()])); + return Clustering.make(values.toArray(new ByteBuffer[metadata.comparator.size()])); } else { - return new Clustering(name); + return Clustering.make(name); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index e44124f..c276c57 100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@ -183,7 +183,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> if (slices.size() == 0) { DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); - return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed); + return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey(), staticRow, partitionDeletion, reversed); } return slices.size() == 1 @@ -202,9 +202,9 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> } private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter, - ColumnFilter selection, boolean reversed, Holder current, Row staticRow) + ColumnFilter selection, boolean reversed, Holder current, Row staticRow) { - return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(), + return new RowAndDeletionMergeIterator(metadata, partitionKey(), current.deletionInfo.getPartitionDeletion(), selection, staticRow, reversed, current.stats, rowIter, deleteIter, canHaveShadowedData()); @@ -215,22 +215,10 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> final Holder current; final ColumnFilter selection; - private AbstractIterator(ColumnFilter selection, boolean isReversed) - { - this(AbstractBTreePartition.this.holder(), selection, isReversed); - } - - private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed) - { - this(current, - AbstractBTreePartition.this.staticRow(current, selection, false), - selection, isReversed); - } - private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed) { super(AbstractBTreePartition.this.metadata, - AbstractBTreePartition.this.partitionKey, + AbstractBTreePartition.this.partitionKey(), current.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator // it would also be more precise to return the intersection of the selection and current.columns, @@ -318,10 +306,7 @@ public abstract class AbstractBTreePartition implements Partition, Iterable<Row> BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity); builder.auto(false); while (rows.hasNext()) - { - Row row = rows.next(); - builder.add(row); - } + builder.add(rows.next()); if (reversed) builder.reverse(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index 2be882e..c7113d4 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.partitions; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -26,12 +27,12 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.EncodingStats; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.SearchIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.UpdateFunction; import org.apache.cassandra.utils.concurrent.Locks; @@ -181,7 +182,66 @@ public class AtomicBTreePartition extends AbstractBTreePartition if (monitorOwned) Locks.monitorExitUnsafe(this); } + } + + @Override + public DeletionInfo deletionInfo() + { + return allocator.ensureOnHeap().applyToDeletionInfo(super.deletionInfo()); + } + @Override + public Row staticRow() + { + return allocator.ensureOnHeap().applyToStatic(super.staticRow()); + } + + @Override + public DecoratedKey partitionKey() + { + return allocator.ensureOnHeap().applyToPartitionKey(super.partitionKey()); + } + + @Override + public Row getRow(Clustering clustering) + { + return allocator.ensureOnHeap().applyToRow(super.getRow(clustering)); + } + + @Override + public Row lastRow() + { + return allocator.ensureOnHeap().applyToRow(super.lastRow()); + } + + @Override + public SearchIterator<Clustering, Row> searchIterator(ColumnFilter columns, boolean reversed) + { + return allocator.ensureOnHeap().applyToPartition(super.searchIterator(columns, reversed)); + } + + @Override + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + { + return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(selection, slices, reversed)); + } + + @Override + public UnfilteredRowIterator unfilteredIterator() + { + return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator()); + } + + @Override + public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed) + { + return allocator.ensureOnHeap().applyToPartition(super.unfilteredIterator(current, selection, slices, reversed)); + } + + @Override + public Iterator<Row> iterator() + { + return allocator.ensureOnHeap().applyToPartition(super.iterator()); } public boolean usePessimisticLocking() http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java index 26a947b..70a4678 100644 --- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java @@ -65,7 +65,7 @@ public class FilteredPartition extends ImmutableBTreePartition public DecoratedKey partitionKey() { - return partitionKey; + return FilteredPartition.this.partitionKey(); } public Row staticRow() http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index 882c0e0..1ea9713 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -17,15 +17,19 @@ */ package org.apache.cassandra.db.rows; +import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.Objects; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.DeletionPurger; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** * Base abstract class for {@code Cell} implementations. @@ -40,6 +44,81 @@ public abstract class AbstractCell extends Cell super(column); } + public boolean isCounterCell() + { + return !isTombstone() && column.cellValueType().isCounter(); + } + + public boolean isLive(int nowInSec) + { + return localDeletionTime() == NO_DELETION_TIME || (ttl() != NO_TTL && nowInSec < localDeletionTime()); + } + + public boolean isTombstone() + { + return localDeletionTime() != NO_DELETION_TIME && ttl() == NO_TTL; + } + + public boolean isExpiring() + { + return ttl() != NO_TTL; + } + + public Cell markCounterLocalToBeCleared() + { + if (!isCounterCell()) + return this; + + ByteBuffer value = value(); + ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value); + return marked == value ? this : new BufferCell(column, timestamp(), ttl(), localDeletionTime(), marked, path()); + } + + public Cell purge(DeletionPurger purger, int nowInSec) + { + if (!isLive(nowInSec)) + { + if (purger.shouldPurge(timestamp(), localDeletionTime())) + return null; + + // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is + // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since + // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones + // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient + // to do both here. + if (isExpiring()) + { + // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds, + // we'll fulfil our responsibility to repair. See discussion at + // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html + return BufferCell.tombstone(column, timestamp(), localDeletionTime() - ttl()); + } + } + return this; + } + + public Cell copy(AbstractAllocator allocator) + { + CellPath path = path(); + return new BufferCell(column, timestamp(), ttl(), localDeletionTime(), allocator.clone(value()), path == null ? null : path.copy(allocator)); + } + + // note: while the cell returned may be different, the value is the same, so if the value is offheap it must be referenced inside a guarded context (or copied) + public Cell updateAllTimestamp(long newTimestamp) + { + return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl(), localDeletionTime(), value(), path()); + } + + public int dataSize() + { + CellPath path = path(); + return TypeSizes.sizeof(timestamp()) + + TypeSizes.sizeof(ttl()) + + TypeSizes.sizeof(localDeletionTime()) + + value().remaining() + + (path == null ? 0 : path.dataSize()); + } + public void digest(MessageDigest digest) { digest.update(value().duplicate()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index a0912ae..47cfd58 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -62,7 +62,11 @@ public class BTreeRow extends AbstractRow // no expiring cells, this will be Integer.MAX_VALUE; private final int minLocalDeletionTime; - private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime) + private BTreeRow(Clustering clustering, + LivenessInfo primaryKeyLivenessInfo, + Deletion deletion, + Object[] btree, + int minLocalDeletionTime) { assert !deletion.isShadowedBy(primaryKeyLivenessInfo); this.clustering = clustering; @@ -78,7 +82,10 @@ public class BTreeRow extends AbstractRow } // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases. - public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree) + public static BTreeRow create(Clustering clustering, + LivenessInfo primaryKeyLivenessInfo, + Deletion deletion, + Object[] btree) { int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time())); if (minDeletionTime != Integer.MIN_VALUE) @@ -87,6 +94,15 @@ public class BTreeRow extends AbstractRow minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); } + return create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + } + + public static BTreeRow create(Clustering clustering, + LivenessInfo primaryKeyLivenessInfo, + Deletion deletion, + Object[] btree, + int minDeletionTime) + { return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); } @@ -113,7 +129,11 @@ public class BTreeRow extends AbstractRow public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo) { assert !primaryKeyLivenessInfo.isEmpty(); - return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo)); + return new BTreeRow(clustering, + primaryKeyLivenessInfo, + Deletion.LIVE, + BTree.empty(), + minDeletionTime(primaryKeyLivenessInfo)); } private static int minDeletionTime(Cell cell) @@ -368,7 +388,7 @@ public class BTreeRow extends AbstractRow return null; int minDeletionTime = minDeletionTime(transformed, info, deletion.time()); - return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime); + return BTreeRow.create(clustering, info, deletion, transformed, minDeletionTime); } public int dataSize() @@ -594,7 +614,7 @@ public class BTreeRow extends AbstractRow return new ComplexColumnData(column, btree, deletion); } - }; + } protected Clustering clustering; protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY; protected Deletion deletion = Deletion.LIVE; @@ -680,10 +700,9 @@ public class BTreeRow extends AbstractRow deletion = Deletion.LIVE; int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time()); - Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + Row row = BTreeRow.create(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); reset(); return row; } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 4176ba6..cac63ac 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -17,16 +17,11 @@ */ package org.apache.cassandra.db.rows; -import java.io.IOException; import java.nio.ByteBuffer; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.marshal.ByteType; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.FBUtilities; @@ -88,26 +83,6 @@ public class BufferCell extends AbstractCell return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path); } - public boolean isCounterCell() - { - return !isTombstone() && column.cellValueType().isCounter(); - } - - public boolean isLive(int nowInSec) - { - return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime); - } - - public boolean isTombstone() - { - return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL; - } - - public boolean isExpiring() - { - return ttl != NO_TTL; - } - public long timestamp() { return timestamp; @@ -146,216 +121,9 @@ public class BufferCell extends AbstractCell return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator)); } - public Cell markCounterLocalToBeCleared() - { - if (!isCounterCell()) - return this; - - ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value()); - return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path); - } - - public Cell purge(DeletionPurger purger, int nowInSec) - { - if (!isLive(nowInSec)) - { - if (purger.shouldPurge(timestamp, localDeletionTime)) - return null; - - // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is - // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since - // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones - // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient - // to do both here. - if (isExpiring()) - { - // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds, - // we'll fulfil our responsibility to repair. See discussion at - // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html - return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl); - } - } - return this; - } - - public Cell updateAllTimestamp(long newTimestamp) - { - return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path); - } - - public int dataSize() - { - return TypeSizes.sizeof(timestamp) - + TypeSizes.sizeof(ttl) - + TypeSizes.sizeof(localDeletionTime) - + value.remaining() - + (path == null ? 0 : path.dataSize()); - } - public long unsharedHeapSizeExcludingData() { return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData()); } - /** - * The serialization format for cell is: - * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ] - * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ] - * - * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following - * meaning: - * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants) - * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK. - * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK) - * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK. - * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the - * USE_ROW_TTL_MASK. - * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value - * for columns of this type have a fixed length. - * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column. - * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK. - * - [ path ]: the cell path if the column this is a cell of is complex. - */ - static class Serializer implements Cell.Serializer - { - private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not. - private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring. - private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular. - private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of. - private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of. - - public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException - { - assert cell != null; - boolean hasValue = cell.value().hasRemaining(); - boolean isDeleted = cell.isTombstone(); - boolean isExpiring = cell.isExpiring(); - boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); - boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); - int flags = 0; - if (!hasValue) - flags |= HAS_EMPTY_VALUE_MASK; - - if (isDeleted) - flags |= IS_DELETED_MASK; - else if (isExpiring) - flags |= IS_EXPIRING_MASK; - - if (useRowTimestamp) - flags |= USE_ROW_TIMESTAMP_MASK; - if (useRowTTL) - flags |= USE_ROW_TTL_MASK; - - out.writeByte((byte)flags); - - if (!useRowTimestamp) - header.writeTimestamp(cell.timestamp(), out); - - if ((isDeleted || isExpiring) && !useRowTTL) - header.writeLocalDeletionTime(cell.localDeletionTime(), out); - if (isExpiring && !useRowTTL) - header.writeTTL(cell.ttl(), out); - - if (cell.column().isComplex()) - cell.column().cellPathSerializer().serialize(cell.path(), out); - - if (hasValue) - header.getType(cell.column()).writeValue(cell.value(), out); - } - - public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException - { - int flags = in.readUnsignedByte(); - boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; - boolean isDeleted = (flags & IS_DELETED_MASK) != 0; - boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; - boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; - boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; - - long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in); - - int localDeletionTime = useRowTTL - ? rowLiveness.localExpirationTime() - : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME); - - int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL); - - CellPath path = column.isComplex() - ? column.cellPathSerializer().deserialize(in) - : null; - - boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter(); - - ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER; - if (hasValue) - { - if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path))) - { - header.getType(column).skipValue(in); - } - else - { - value = header.getType(column).readValue(in); - if (isCounter) - value = helper.maybeClearCounterValue(value); - } - } - - return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); - } - - public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header) - { - long size = 1; // flags - boolean hasValue = cell.value().hasRemaining(); - boolean isDeleted = cell.isTombstone(); - boolean isExpiring = cell.isExpiring(); - boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); - boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); - - if (!useRowTimestamp) - size += header.timestampSerializedSize(cell.timestamp()); - - if ((isDeleted || isExpiring) && !useRowTTL) - size += header.localDeletionTimeSerializedSize(cell.localDeletionTime()); - if (isExpiring && !useRowTTL) - size += header.ttlSerializedSize(cell.ttl()); - - if (cell.column().isComplex()) - size += cell.column().cellPathSerializer().serializedSize(cell.path()); - - if (hasValue) - size += header.getType(cell.column()).writtenLength(cell.value()); - - return size; - } - - // Returns if the skipped cell was an actual cell (i.e. it had its presence flag). - public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException - { - int flags = in.readUnsignedByte(); - boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; - boolean isDeleted = (flags & IS_DELETED_MASK) != 0; - boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; - boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; - boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; - - if (!useRowTimestamp) - header.skipTimestamp(in); - - if (!useRowTTL && (isDeleted || isExpiring)) - header.skipLocalDeletionTime(in); - - if (!useRowTTL && isExpiring) - header.skipTTL(in); - - if (column.isComplex()) - column.cellPathSerializer().skip(in); - - if (hasValue) - header.getType(column).skipValue(in); - - return true; - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2f412431/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index 73d9e44..ad1c39a 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.memory.AbstractAllocator; /** @@ -141,15 +142,165 @@ public abstract class Cell extends ColumnData // Overrides super type to provide a more precise return type. public abstract Cell purge(DeletionPurger purger, int nowInSec); - public interface Serializer + /** + * The serialization format for cell is: + * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ] + * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ] + * + * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following + * meaning: + * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants) + * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK. + * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK) + * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK. + * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the + * USE_ROW_TTL_MASK. + * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value + * for columns of this type have a fixed length. + * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column. + * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK. + * - [ path ]: the cell path if the column this is a cell of is complex. + */ + static class Serializer { - public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException; + private final static int IS_DELETED_MASK = 0x01; // Whether the cell is a tombstone or not. + private final static int IS_EXPIRING_MASK = 0x02; // Whether the cell is expiring. + private final static int HAS_EMPTY_VALUE_MASK = 0x04; // Wether the cell has an empty value. This will be the case for tombstone in particular. + private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of. + private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of. + + public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException + { + assert cell != null; + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); + int flags = 0; + if (!hasValue) + flags |= HAS_EMPTY_VALUE_MASK; + + if (isDeleted) + flags |= IS_DELETED_MASK; + else if (isExpiring) + flags |= IS_EXPIRING_MASK; + + if (useRowTimestamp) + flags |= USE_ROW_TIMESTAMP_MASK; + if (useRowTTL) + flags |= USE_ROW_TTL_MASK; + + out.writeByte((byte)flags); + + if (!useRowTimestamp) + header.writeTimestamp(cell.timestamp(), out); + + if ((isDeleted || isExpiring) && !useRowTTL) + header.writeLocalDeletionTime(cell.localDeletionTime(), out); + if (isExpiring && !useRowTTL) + header.writeTTL(cell.ttl(), out); + + if (cell.column().isComplex()) + cell.column().cellPathSerializer().serialize(cell.path(), out); + + if (hasValue) + header.getType(cell.column()).writeValue(cell.value(), out); + } + + public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException + { + int flags = in.readUnsignedByte(); + boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & IS_DELETED_MASK) != 0; + boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; + + long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.readTimestamp(in); + + int localDeletionTime = useRowTTL + ? rowLiveness.localExpirationTime() + : (isDeleted || isExpiring ? header.readLocalDeletionTime(in) : NO_DELETION_TIME); + + int ttl = useRowTTL ? rowLiveness.ttl() : (isExpiring ? header.readTTL(in) : NO_TTL); - public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException; + CellPath path = column.isComplex() + ? column.cellPathSerializer().deserialize(in) + : null; - public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header); + boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter(); + + ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (hasValue) + { + if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path))) + { + header.getType(column).skipValue(in); + } + else + { + value = header.getType(column).readValue(in); + if (isCounter) + value = helper.maybeClearCounterValue(value); + } + } + + return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); + } + + public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header) + { + long size = 1; // flags + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); + + if (!useRowTimestamp) + size += header.timestampSerializedSize(cell.timestamp()); + + if ((isDeleted || isExpiring) && !useRowTTL) + size += header.localDeletionTimeSerializedSize(cell.localDeletionTime()); + if (isExpiring && !useRowTTL) + size += header.ttlSerializedSize(cell.ttl()); + + if (cell.column().isComplex()) + size += cell.column().cellPathSerializer().serializedSize(cell.path()); + + if (hasValue) + size += header.getType(cell.column()).writtenLength(cell.value()); + + return size; + } // Returns if the skipped cell was an actual cell (i.e. it had its presence flag). - public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException; + public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException + { + int flags = in.readUnsignedByte(); + boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & IS_DELETED_MASK) != 0; + boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; + + if (!useRowTimestamp) + header.skipTimestamp(in); + + if (!useRowTTL && (isDeleted || isExpiring)) + header.skipLocalDeletionTime(in); + + if (!useRowTTL && isExpiring) + header.skipTTL(in); + + if (column.isComplex()) + column.cellPathSerializer().skip(in); + + if (hasValue) + header.getType(column).skipValue(in); + + return true; + } } }