http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CollectionType.java b/src/java/org/apache/cassandra/db/marshal/CollectionType.java index 1660b2e..0b00b47 100644 --- a/src/java/org/apache/cassandra/db/marshal/CollectionType.java +++ b/src/java/org/apache/cassandra/db/marshal/CollectionType.java @@ -18,22 +18,28 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; +import java.io.DataInput; +import java.io.IOException; import java.util.List; +import java.util.Iterator; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.transport.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.Lists; import org.apache.cassandra.cql3.Maps; import org.apache.cassandra.cql3.Sets; - +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.ByteBufferUtil; /** @@ -47,6 +53,8 @@ public abstract class CollectionType<T> extends AbstractType<T> public static final int MAX_ELEMENTS = 65535; + public static CellPath.Serializer cellPathSerializer = new CollectionPathSerializer(); + public enum Kind { MAP @@ -84,6 +92,8 @@ public abstract class CollectionType<T> extends AbstractType<T> public abstract AbstractType<?> nameComparator(); public abstract AbstractType<?> valueComparator(); + protected abstract List<ByteBuffer> serializedValues(Iterator<Cell> cells); + @Override public abstract CollectionSerializer<T> getSerializer(); @@ -132,28 +142,33 @@ public abstract class CollectionType<T> extends AbstractType<T> return kind == Kind.MAP; } - public List<Cell> enforceLimit(ColumnDefinition def, List<Cell> cells, int version) + // Overrided by maps + protected int collectionSize(List<ByteBuffer> values) + { + return values.size(); + } + + protected int enforceLimit(ColumnDefinition def, List<ByteBuffer> values, int version) { assert isMultiCell(); - if (version >= Server.VERSION_3 || cells.size() <= MAX_ELEMENTS) - return cells; + int size = collectionSize(values); + if (version >= Server.VERSION_3 || size <= MAX_ELEMENTS) + return size; logger.error("Detected collection for table {}.{} with {} elements, more than the {} limit. Only the first {}" + " elements will be returned to the client. Please see " + "http://cassandra.apache.org/doc/cql3/CQL.html#collections for more details.", - def.ksName, def.cfName, cells.size(), MAX_ELEMENTS, MAX_ELEMENTS); - return cells.subList(0, MAX_ELEMENTS); + def.ksName, def.cfName, values.size(), MAX_ELEMENTS, MAX_ELEMENTS); + return MAX_ELEMENTS; } - public abstract List<ByteBuffer> serializedValues(List<Cell> cells); - - public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, List<Cell> cells, int version) + public ByteBuffer serializeForNativeProtocol(ColumnDefinition def, Iterator<Cell> cells, int version) { assert isMultiCell(); - cells = enforceLimit(def, cells, version); List<ByteBuffer> values = serializedValues(cells); - return CollectionSerializer.pack(values, cells.size(), version); + int size = enforceLimit(def, values, version); + return CollectionSerializer.pack(values, size, version); } @Override @@ -217,4 +232,28 @@ public abstract class CollectionType<T> extends AbstractType<T> { return this.toString(false); } + + private static class CollectionPathSerializer implements CellPath.Serializer + { + public void serialize(CellPath path, DataOutputPlus out) throws IOException + { + ByteBufferUtil.writeWithLength(path.get(0), out); + } + + public CellPath deserialize(DataInput in) throws IOException + { + return CellPath.create(ByteBufferUtil.readWithLength(in)); + } + + public long serializedSize(CellPath path, TypeSizes sizes) + { + return sizes.sizeofWithLength(path.get(0)); + } + + public void skip(DataInput in) throws IOException + { + int length = in.readInt(); + FileUtils.skipBytesFully(in, length); + } + } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java index 1d2c88c..a81d3f8 100644 --- a/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java +++ b/src/java/org/apache/cassandra/db/marshal/ColumnToCollectionType.java @@ -31,6 +31,9 @@ import org.apache.cassandra.serializers.BytesSerializer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; +/* + * This class is deprecated and only kept for backward compatibility. + */ public class ColumnToCollectionType extends AbstractType<ByteBuffer> { // interning instances http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index f3c041e..01eb58f 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -177,6 +177,7 @@ public class CompositeType extends AbstractCompositeType // most names will be complete. ByteBuffer[] l = new ByteBuffer[types.size()]; ByteBuffer bb = name.duplicate(); + readStatic(bb); int i = 0; while (bb.remaining() > 0) { @@ -186,6 +187,19 @@ public class CompositeType extends AbstractCompositeType return i == l.length ? l : Arrays.copyOfRange(l, 0, i); } + public static List<ByteBuffer> splitName(ByteBuffer name) + { + List<ByteBuffer> l = new ArrayList<>(); + ByteBuffer bb = name.duplicate(); + readStatic(bb); + while (bb.remaining() > 0) + { + l.add(ByteBufferUtil.readBytesWithShortLength(bb)); + bb.get(); // skip end-of-component + } + return l; + } + // Extract component idx from bb. Return null if there is not enough component. public static ByteBuffer extractComponent(ByteBuffer bb, int idx) { @@ -318,11 +332,20 @@ public class CompositeType extends AbstractCompositeType public static ByteBuffer build(ByteBuffer... buffers) { - int totalLength = 0; + return build(false, buffers); + } + + public static ByteBuffer build(boolean isStatic, ByteBuffer... buffers) + { + int totalLength = isStatic ? 2 : 0; for (ByteBuffer bb : buffers) totalLength += 2 + bb.remaining() + 1; ByteBuffer out = ByteBuffer.allocate(totalLength); + + if (isStatic) + out.putShort((short)STATIC_MARKER); + for (ByteBuffer bb : buffers) { ByteBufferUtil.writeShortLength(out, bb.remaining()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java index 4b3ce82..687e525 100644 --- a/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java +++ b/src/java/org/apache/cassandra/db/marshal/CounterColumnType.java @@ -24,6 +24,7 @@ import org.apache.cassandra.cql3.Term; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.serializers.CounterSerializer; +import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; public class CounterColumnType extends AbstractType<Long> @@ -59,6 +60,12 @@ public class CounterColumnType extends AbstractType<Long> return ByteBufferUtil.bytes(value); } + @Override + public void validateCellValue(ByteBuffer cellValue) throws MarshalException + { + CounterContext.instance().validateContext(cellValue); + } + public int compare(ByteBuffer o1, ByteBuffer o2) { return ByteBufferUtil.compareUnsigned(o1, o2); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DateType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/DateType.java b/src/java/org/apache/cassandra/db/marshal/DateType.java index 359ce52..66da443 100644 --- a/src/java/org/apache/cassandra/db/marshal/DateType.java +++ b/src/java/org/apache/cassandra/db/marshal/DateType.java @@ -124,4 +124,10 @@ public class DateType extends AbstractType<Date> { return TimestampSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 8; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/DoubleType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/DoubleType.java b/src/java/org/apache/cassandra/db/marshal/DoubleType.java index 661b3c9..bc160d5 100644 --- a/src/java/org/apache/cassandra/db/marshal/DoubleType.java +++ b/src/java/org/apache/cassandra/db/marshal/DoubleType.java @@ -97,4 +97,10 @@ public class DoubleType extends AbstractType<Double> { return DoubleSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 8; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/EmptyType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/EmptyType.java b/src/java/org/apache/cassandra/db/marshal/EmptyType.java index f82d767..448376f 100644 --- a/src/java/org/apache/cassandra/db/marshal/EmptyType.java +++ b/src/java/org/apache/cassandra/db/marshal/EmptyType.java @@ -69,4 +69,10 @@ public class EmptyType extends AbstractType<Void> { return EmptySerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 0; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/FloatType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/FloatType.java b/src/java/org/apache/cassandra/db/marshal/FloatType.java index af02cad..ceedce4 100644 --- a/src/java/org/apache/cassandra/db/marshal/FloatType.java +++ b/src/java/org/apache/cassandra/db/marshal/FloatType.java @@ -96,4 +96,10 @@ public class FloatType extends AbstractType<Float> { return FloatSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 4; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/Int32Type.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/Int32Type.java b/src/java/org/apache/cassandra/db/marshal/Int32Type.java index 67d8142..cb0c584 100644 --- a/src/java/org/apache/cassandra/db/marshal/Int32Type.java +++ b/src/java/org/apache/cassandra/db/marshal/Int32Type.java @@ -109,4 +109,9 @@ public class Int32Type extends AbstractType<Integer> return Int32Serializer.instance; } + @Override + protected int valueLengthIfFixed() + { + return 4; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java index 3e00d71..174ce3a 100644 --- a/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java +++ b/src/java/org/apache/cassandra/db/marshal/LexicalUUIDType.java @@ -83,4 +83,10 @@ public class LexicalUUIDType extends AbstractType<UUID> { return UUIDSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 16; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ListType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ListType.java b/src/java/org/apache/cassandra/db/marshal/ListType.java index 03f39d7..73af808 100644 --- a/src/java/org/apache/cassandra/db/marshal/ListType.java +++ b/src/java/org/apache/cassandra/db/marshal/ListType.java @@ -23,14 +23,13 @@ import java.util.*; import org.apache.cassandra.cql3.Json; import org.apache.cassandra.cql3.Lists; import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.CollectionSerializer; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.serializers.ListSerializer; -import org.apache.cassandra.transport.Server; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,12 +168,12 @@ public class ListType<T> extends CollectionType<List<T>> return sb.toString(); } - public List<ByteBuffer> serializedValues(List<Cell> cells) + public List<ByteBuffer> serializedValues(Iterator<Cell> cells) { assert isMultiCell; - List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size()); - for (Cell c : cells) - bbs.add(c.value()); + List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(); + while (cells.hasNext()) + bbs.add(cells.next().value()); return bbs; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java index 427598d..e02ba3c 100644 --- a/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java +++ b/src/java/org/apache/cassandra/db/marshal/LocalByPartionerType.java @@ -20,11 +20,12 @@ package org.apache.cassandra.db.marshal; import java.nio.ByteBuffer; import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.db.RowPosition; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; /** for sorting columns representing row keys in the row ordering as determined by a partitioner. @@ -38,6 +39,11 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer> this.partitioner = partitioner; } + public static LocalByPartionerType getInstance(TypeParser parser) + { + return new LocalByPartionerType(StorageService.getPartitioner()); + } + @Override public ByteBuffer compose(ByteBuffer bytes) { @@ -74,8 +80,8 @@ public class LocalByPartionerType extends AbstractType<ByteBuffer> public int compare(ByteBuffer o1, ByteBuffer o2) { - // o1 and o2 can be empty so we need to use RowPosition, not DecoratedKey - return RowPosition.ForKey.get(o1, partitioner).compareTo(RowPosition.ForKey.get(o2, partitioner)); + // o1 and o2 can be empty so we need to use PartitionPosition, not DecoratedKey + return PartitionPosition.ForKey.get(o1, partitioner).compareTo(PartitionPosition.ForKey.get(o2, partitioner)); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/LongType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/LongType.java b/src/java/org/apache/cassandra/db/marshal/LongType.java index d77d7d0..9d41f4f 100644 --- a/src/java/org/apache/cassandra/db/marshal/LongType.java +++ b/src/java/org/apache/cassandra/db/marshal/LongType.java @@ -118,4 +118,9 @@ public class LongType extends AbstractType<Long> return LongSerializer.instance; } + @Override + protected int valueLengthIfFixed() + { + return 8; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/MapType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/MapType.java b/src/java/org/apache/cassandra/db/marshal/MapType.java index 983710b..28a4fd5 100644 --- a/src/java/org/apache/cassandra/db/marshal/MapType.java +++ b/src/java/org/apache/cassandra/db/marshal/MapType.java @@ -23,7 +23,7 @@ import java.util.*; import org.apache.cassandra.cql3.Json; import org.apache.cassandra.cql3.Maps; import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.CollectionSerializer; @@ -173,6 +173,11 @@ public class MapType<K, V> extends CollectionType<Map<K, V>> } @Override + protected int collectionSize(List<ByteBuffer> values) + { + return values.size() / 2; + } + public String toString(boolean ignoreFreezing) { boolean includeFrozenType = !ignoreFreezing && !isMultiCell(); @@ -186,13 +191,14 @@ public class MapType<K, V> extends CollectionType<Map<K, V>> return sb.toString(); } - public List<ByteBuffer> serializedValues(List<Cell> cells) + public List<ByteBuffer> serializedValues(Iterator<Cell> cells) { assert isMultiCell; - List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size() * 2); - for (Cell c : cells) + List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(); + while (cells.hasNext()) { - bbs.add(c.name().collectionElement()); + Cell c = cells.next(); + bbs.add(c.path().get(0)); bbs.add(c.value()); } return bbs; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/ReversedType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/ReversedType.java b/src/java/org/apache/cassandra/db/marshal/ReversedType.java index 2181f74..cf357a8 100644 --- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java +++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java @@ -80,6 +80,12 @@ public class ReversedType<T> extends AbstractType<T> return baseType.compare(o2, o1); } + @Override + public int compareForCQL(ByteBuffer v1, ByteBuffer v2) + { + return baseType.compare(v1, v2); + } + public String getString(ByteBuffer bytes) { return baseType.getString(bytes); @@ -129,6 +135,12 @@ public class ReversedType<T> extends AbstractType<T> } @Override + protected int valueLengthIfFixed() + { + return baseType.valueLengthIfFixed(); + } + + @Override public String toString() { return getClass().getName() + "(" + baseType + ")"; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/SetType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/SetType.java b/src/java/org/apache/cassandra/db/marshal/SetType.java index 78aac25..126c6aa 100644 --- a/src/java/org/apache/cassandra/db/marshal/SetType.java +++ b/src/java/org/apache/cassandra/db/marshal/SetType.java @@ -23,12 +23,11 @@ import java.util.*; import org.apache.cassandra.cql3.Json; import org.apache.cassandra.cql3.Sets; import org.apache.cassandra.cql3.Term; -import org.apache.cassandra.db.Cell; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.serializers.SetSerializer; -import org.apache.cassandra.transport.Server; public class SetType<T> extends CollectionType<Set<T>> { @@ -144,11 +143,11 @@ public class SetType<T> extends CollectionType<Set<T>> return sb.toString(); } - public List<ByteBuffer> serializedValues(List<Cell> cells) + public List<ByteBuffer> serializedValues(Iterator<Cell> cells) { - List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(cells.size()); - for (Cell c : cells) - bbs.add(c.name().collectionElement()); + List<ByteBuffer> bbs = new ArrayList<ByteBuffer>(); + while (cells.hasNext()) + bbs.add(cells.next().path().get(0)); return bbs; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java index a1d8d82..64fa750 100644 --- a/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java +++ b/src/java/org/apache/cassandra/db/marshal/TimeUUIDType.java @@ -127,4 +127,10 @@ public class TimeUUIDType extends AbstractType<UUID> { return TimeUUIDSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 16; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/TimestampType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/TimestampType.java b/src/java/org/apache/cassandra/db/marshal/TimestampType.java index b01651d..288f8fd 100644 --- a/src/java/org/apache/cassandra/db/marshal/TimestampType.java +++ b/src/java/org/apache/cassandra/db/marshal/TimestampType.java @@ -125,4 +125,10 @@ public class TimestampType extends AbstractType<Date> { return TimestampSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 8; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/UUIDType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/UUIDType.java b/src/java/org/apache/cassandra/db/marshal/UUIDType.java index 0250eb20..14c9f48 100644 --- a/src/java/org/apache/cassandra/db/marshal/UUIDType.java +++ b/src/java/org/apache/cassandra/db/marshal/UUIDType.java @@ -168,4 +168,10 @@ public class UUIDType extends AbstractType<UUID> { return (uuid.get(6) & 0xf0) >> 4; } + + @Override + protected int valueLengthIfFixed() + { + return 16; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java new file mode 100644 index 0000000..2fcd7b3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AbstractPartitionData.java @@ -0,0 +1,831 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.SearchIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract common class for all non-thread safe Partition implementations. + */ +public abstract class AbstractPartitionData implements Partition, Iterable<Row> +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionData.class); + + protected final CFMetaData metadata; + protected final DecoratedKey key; + + protected final DeletionInfo deletionInfo; + protected final PartitionColumns columns; + + protected Row staticRow; + + protected int rows; + + // The values for the clustering columns of the rows contained in this partition object. If + // clusteringSize is the size of the clustering comparator for this table, clusterings has size + // clusteringSize * rows where rows is the number of rows stored, and row i has it's clustering + // column values at indexes [clusteringSize * i, clusteringSize * (i + 1)). + protected ByteBuffer[] clusterings; + + // The partition key column liveness infos for the rows of this partition (row i has its liveness info at index i). + protected final LivenessInfoArray livenessInfos; + // The row deletion for the rows of this partition (row i has its row deletion at index i). + protected final DeletionTimeArray deletions; + + // The row data (cells data + complex deletions for complex columns) for the rows contained in this partition. + protected final RowDataBlock data; + + // Stats over the rows stored in this partition. + private final RowStats.Collector statsCollector = new RowStats.Collector(); + + // The maximum timestamp for any data contained in this partition. + protected long maxTimestamp = Long.MIN_VALUE; + + private AbstractPartitionData(CFMetaData metadata, + DecoratedKey key, + DeletionInfo deletionInfo, + ByteBuffer[] clusterings, + LivenessInfoArray livenessInfos, + DeletionTimeArray deletions, + PartitionColumns columns, + RowDataBlock data) + { + this.metadata = metadata; + this.key = key; + this.deletionInfo = deletionInfo; + this.clusterings = clusterings; + this.livenessInfos = livenessInfos; + this.deletions = deletions; + this.columns = columns; + this.data = data; + + collectStats(deletionInfo.getPartitionDeletion()); + Iterator<RangeTombstone> iter = deletionInfo.rangeIterator(false); + while (iter.hasNext()) + collectStats(iter.next().deletionTime()); + } + + protected AbstractPartitionData(CFMetaData metadata, + DecoratedKey key, + DeletionInfo deletionInfo, + PartitionColumns columns, + RowDataBlock data, + int initialRowCapacity) + { + this(metadata, + key, + deletionInfo, + new ByteBuffer[initialRowCapacity * metadata.clusteringColumns().size()], + new LivenessInfoArray(initialRowCapacity), + new DeletionTimeArray(initialRowCapacity), + columns, + data); + } + + protected AbstractPartitionData(CFMetaData metadata, + DecoratedKey key, + DeletionTime partitionDeletion, + PartitionColumns columns, + int initialRowCapacity, + boolean sortable) + { + this(metadata, + key, + new DeletionInfo(partitionDeletion.takeAlias()), + columns, + new RowDataBlock(columns.regulars, initialRowCapacity, sortable, metadata.isCounter()), + initialRowCapacity); + } + + private void collectStats(DeletionTime dt) + { + statsCollector.updateDeletionTime(dt); + maxTimestamp = Math.max(maxTimestamp, dt.markedForDeleteAt()); + } + + private void collectStats(LivenessInfo info) + { + statsCollector.updateTimestamp(info.timestamp()); + statsCollector.updateTTL(info.ttl()); + statsCollector.updateLocalDeletionTime(info.localDeletionTime()); + maxTimestamp = Math.max(maxTimestamp, info.timestamp()); + } + + public CFMetaData metadata() + { + return metadata; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return deletionInfo.getPartitionDeletion(); + } + + public PartitionColumns columns() + { + return columns; + } + + public Row staticRow() + { + return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; + } + + public RowStats stats() + { + return statsCollector.get(); + } + + /** + * The deletion info for the partition update. + * + * <b>warning:</b> the returned object should be used in a read-only fashion. In particular, + * it should not be used to add new range tombstones to this deletion. For that, + * {@link addRangeTombstone} should be used instead. The reason being that adding directly to + * the returned object would bypass some stats collection that {@code addRangeTombstone} does. + * + * @return the deletion info for the partition update for use as read-only. + */ + public DeletionInfo deletionInfo() + { + // TODO: it is a tad fragile that deletionInfo can be but shouldn't be modified. We + // could add the option of providing a read-only view of a DeletionInfo instead. + return deletionInfo; + } + + public void addPartitionDeletion(DeletionTime deletionTime) + { + collectStats(deletionTime); + deletionInfo.add(deletionTime); + } + + public void addRangeTombstone(Slice deletedSlice, DeletionTime deletion) + { + addRangeTombstone(new RangeTombstone(deletedSlice, deletion.takeAlias())); + } + + public void addRangeTombstone(RangeTombstone range) + { + collectStats(range.deletionTime()); + deletionInfo.add(range, metadata.comparator); + } + + /** + * Swap row i and j. + * + * This is only used when we need to reorder rows because those were not added in clustering order, + * which happens in {@link PartitionUpdate#sort} and {@link ArrayBackedPartition#create}. This method + * is public only because {@code PartitionUpdate} needs to implement {@link Sorting.Sortable}, but + * it should really only be used by subclasses (and with care) in practice. + */ + public void swap(int i, int j) + { + int cs = metadata.clusteringColumns().size(); + for (int k = 0; k < cs; k++) + { + ByteBuffer tmp = clusterings[j * cs + k]; + clusterings[j * cs + k] = clusterings[i * cs + k]; + clusterings[i * cs + k] = tmp; + } + + livenessInfos.swap(i, j); + deletions.swap(i, j); + data.swap(i, j); + } + + public int rowCount() + { + return rows; + } + + public boolean isEmpty() + { + return deletionInfo.isLive() && rows == 0 && staticRow().isEmpty(); + } + + protected void clear() + { + rows = 0; + Arrays.fill(clusterings, null); + livenessInfos.clear(); + deletions.clear(); + data.clear(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + CFMetaData metadata = metadata(); + sb.append(String.format("Partition[%s.%s] key=%s columns=%s deletion=%s", + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(partitionKey().getKey()), + columns(), + deletionInfo)); + + if (staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(staticRow().toString(metadata, true)); + + // We use createRowIterator() directly instead of iterator() because that avoids + // sorting for PartitionUpdate (which inherit this method) and that is useful because + // 1) it can help with debugging and 2) we can't write after sorting but we want to + // be able to print an update while we build it (again for debugging) + Iterator<Row> iterator = createRowIterator(null, false); + while (iterator.hasNext()) + sb.append("\n ").append(iterator.next().toString(metadata, true)); + + return sb.toString(); + } + + protected void reverse() + { + for (int i = 0; i < rows / 2; i++) + swap(i, rows - 1 - i); + } + + public Row getRow(Clustering clustering) + { + Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); + // Note that for statics, this will never return null, this will return an empty row. However, + // it's more consistent for this method to return null if we don't really have a static row. + return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; + } + + /** + * Returns an iterator that iterators over the rows of this update in clustering order. + * + * @return an iterator over the rows of this update. + */ + public Iterator<Row> iterator() + { + return createRowIterator(null, false); + } + + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed) + { + final RowIterator iter = createRowIterator(columns, reversed); + return new SearchIterator<Clustering, Row>() + { + public boolean hasNext() + { + return iter.hasNext(); + } + + public Row next(Clustering key) + { + if (key == Clustering.STATIC_CLUSTERING) + { + if (columns.fetchedColumns().statics.isEmpty() || staticRow().isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + return FilteringRow.columnsFilteringRow(columns).setTo(staticRow()); + } + + return iter.seekTo(key) ? iter.next() : null; + } + }; + } + + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed) + { + return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed)); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() + { + return sliceableUnfilteredIterator(ColumnFilter.selection(columns()), false); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(final ColumnFilter selection, final boolean reversed) + { + return new AbstractSliceableIterator(this, selection.fetchedColumns(), reversed) + { + private final RowIterator rowIterator = createRowIterator(selection, reversed); + private RowAndTombstoneMergeIterator mergeIterator = new RowAndTombstoneMergeIterator(metadata.comparator, reversed); + + protected Unfiltered computeNext() + { + if (!mergeIterator.isSet()) + mergeIterator.setTo(rowIterator, deletionInfo.rangeIterator(reversed)); + + return mergeIterator.hasNext() ? mergeIterator.next() : endOfData(); + } + + public Iterator<Unfiltered> slice(Slice slice) + { + return mergeIterator.setTo(rowIterator.slice(slice), deletionInfo.rangeIterator(slice, reversed)); + } + }; + } + + private RowIterator createRowIterator(ColumnFilter columns, boolean reversed) + { + return reversed ? new ReverseRowIterator(columns) : new ForwardRowIterator(columns); + } + + /** + * An iterator over the rows of this partition that reuse the same row object. + */ + private abstract class RowIterator extends UnmodifiableIterator<Row> + { + protected final InternalReusableClustering clustering = new InternalReusableClustering(); + protected final InternalReusableRow reusableRow; + protected final FilteringRow filter; + + protected int next; + + protected RowIterator(final ColumnFilter columns) + { + this.reusableRow = new InternalReusableRow(clustering); + this.filter = columns == null ? null : FilteringRow.columnsFilteringRow(columns); + } + + /* + * Move the iterator so that row {@code name} is returned next by {@code next} if that + * row exists. Otherwise the first row sorting after {@code name} will be returned. + * Returns whether {@code name} was found or not. + */ + public abstract boolean seekTo(Clustering name); + + public abstract Iterator<Row> slice(Slice slice); + + protected Row setRowTo(int row) + { + reusableRow.setTo(row); + return filter == null ? reusableRow : filter.setTo(reusableRow); + } + + /** + * Simple binary search. + */ + protected int binarySearch(ClusteringPrefix name, int fromIndex, int toIndex) + { + int low = fromIndex; + int mid = toIndex; + int high = mid - 1; + int result = -1; + while (low <= high) + { + mid = (low + high) >> 1; + if ((result = metadata.comparator.compare(name, clustering.setTo(mid))) > 0) + low = mid + 1; + else if (result == 0) + return mid; + else + high = mid - 1; + } + return -mid - (result < 0 ? 1 : 2); + } + } + + private class ForwardRowIterator extends RowIterator + { + private ForwardRowIterator(ColumnFilter columns) + { + super(columns); + this.next = 0; + } + + public boolean hasNext() + { + return next < rows; + } + + public Row next() + { + return setRowTo(next++); + } + + public boolean seekTo(Clustering name) + { + if (next >= rows) + return false; + + int idx = binarySearch(name, next, rows); + next = idx >= 0 ? idx : -idx - 1; + return idx >= 0; + } + + public Iterator<Row> slice(Slice slice) + { + int sidx = binarySearch(slice.start(), next, rows); + final int start = sidx >= 0 ? sidx : -sidx - 1; + if (start >= rows) + return Collections.emptyIterator(); + + int eidx = binarySearch(slice.end(), start, rows); + // The insertion point is the first element greater than slice.end(), so we want the previous index + final int end = eidx >= 0 ? eidx : -eidx - 2; + + // Remember the end to speed up potential further slice search + next = end; + + if (start > end) + return Collections.emptyIterator(); + + return new AbstractIterator<Row>() + { + private int i = start; + + protected Row computeNext() + { + if (i >= rows || i > end) + return endOfData(); + + return setRowTo(i++); + } + }; + } + } + + private class ReverseRowIterator extends RowIterator + { + private ReverseRowIterator(ColumnFilter columns) + { + super(columns); + this.next = rows - 1; + } + + public boolean hasNext() + { + return next >= 0; + } + + public Row next() + { + return setRowTo(next--); + } + + public boolean seekTo(Clustering name) + { + // We only use that method with forward iterators. + throw new UnsupportedOperationException(); + } + + public Iterator<Row> slice(Slice slice) + { + int sidx = binarySearch(slice.end(), 0, next + 1); + // The insertion point is the first element greater than slice.end(), so we want the previous index + final int start = sidx >= 0 ? sidx : -sidx - 2; + if (start < 0) + return Collections.emptyIterator(); + + int eidx = binarySearch(slice.start(), 0, start + 1); + final int end = eidx >= 0 ? eidx : -eidx - 1; + + // Remember the end to speed up potential further slice search + next = end; + + if (start < end) + return Collections.emptyIterator(); + + return new AbstractIterator<Row>() + { + private int i = start; + + protected Row computeNext() + { + if (i < 0 || i < end) + return endOfData(); + + return setRowTo(i--); + } + }; + } + } + + /** + * A reusable view over the clustering of this partition. + */ + protected class InternalReusableClustering extends Clustering + { + final int size = metadata.clusteringColumns().size(); + private int base; + + public int size() + { + return size; + } + + public Clustering setTo(int row) + { + base = row * size; + return this; + } + + public ByteBuffer get(int i) + { + return clusterings[base + i]; + } + + public ByteBuffer[] getRawValues() + { + ByteBuffer[] values = new ByteBuffer[size]; + for (int i = 0; i < size; i++) + values[i] = get(i); + return values; + } + }; + + /** + * A reusable view over the rows of this partition. + */ + protected class InternalReusableRow extends AbstractReusableRow + { + private final LivenessInfoArray.Cursor liveness = new LivenessInfoArray.Cursor(); + private final DeletionTimeArray.Cursor deletion = new DeletionTimeArray.Cursor(); + private final InternalReusableClustering clustering; + + private int row; + + public InternalReusableRow() + { + this(new InternalReusableClustering()); + } + + public InternalReusableRow(InternalReusableClustering clustering) + { + this.clustering = clustering; + } + + protected RowDataBlock data() + { + return data; + } + + public Row setTo(int row) + { + this.clustering.setTo(row); + this.liveness.setTo(livenessInfos, row); + this.deletion.setTo(deletions, row); + this.row = row; + return this; + } + + protected int row() + { + return row; + } + + public Clustering clustering() + { + return clustering; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return liveness; + } + + public DeletionTime deletion() + { + return deletion; + } + }; + + private static abstract class AbstractSliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator + { + private AbstractSliceableIterator(AbstractPartitionData data, PartitionColumns columns, boolean isReverseOrder) + { + super(data.metadata, data.key, data.partitionLevelDeletion(), columns, data.staticRow(), isReverseOrder, data.stats()); + } + } + + /** + * A row writer to add rows to this partition. + */ + protected class Writer extends RowDataBlock.Writer + { + private int clusteringBase; + + private int simpleColumnsSetInRow; + private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>(); + + public Writer(boolean inOrderCells) + { + super(data, inOrderCells); + } + + public void writeClusteringValue(ByteBuffer value) + { + ensureCapacity(row); + clusterings[clusteringBase++] = value; + } + + public void writePartitionKeyLivenessInfo(LivenessInfo info) + { + ensureCapacity(row); + livenessInfos.set(row, info); + collectStats(info); + } + + public void writeRowDeletion(DeletionTime deletion) + { + ensureCapacity(row); + if (!deletion.isLive()) + deletions.set(row, deletion); + + collectStats(deletion); + } + + @Override + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + ensureCapacity(row); + collectStats(info); + + if (column.isComplex()) + complexColumnsSetInRow.add(column); + else + ++simpleColumnsSetInRow; + + super.writeCell(column, isCounter, value, info, path); + } + + @Override + public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion) + { + ensureCapacity(row); + collectStats(complexDeletion); + + super.writeComplexDeletion(c, complexDeletion); + } + + @Override + public void endOfRow() + { + super.endOfRow(); + ++rows; + + statsCollector.updateColumnSetPerRow(simpleColumnsSetInRow + complexColumnsSetInRow.size()); + + simpleColumnsSetInRow = 0; + complexColumnsSetInRow.clear(); + } + + public int currentRow() + { + return row; + } + + private void ensureCapacity(int rowToSet) + { + int originalCapacity = livenessInfos.size(); + if (rowToSet < originalCapacity) + return; + + int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, rowToSet); + + int clusteringSize = metadata.clusteringColumns().size(); + + clusterings = Arrays.copyOf(clusterings, newCapacity * clusteringSize); + + livenessInfos.resize(newCapacity); + deletions.resize(newCapacity); + } + + @Override + public Writer reset() + { + super.reset(); + clusteringBase = 0; + simpleColumnsSetInRow = 0; + complexColumnsSetInRow.clear(); + return this; + } + } + + /** + * A range tombstone marker writer to add range tombstone markers to this partition. + */ + protected class RangeTombstoneCollector implements RangeTombstoneMarker.Writer + { + private final boolean reversed; + + private final ByteBuffer[] nextValues = new ByteBuffer[metadata().comparator.size()]; + private int size; + private RangeTombstone.Bound.Kind nextKind; + + private Slice.Bound openBound; + private DeletionTime openDeletion; + + public RangeTombstoneCollector(boolean reversed) + { + this.reversed = reversed; + } + + public void writeClusteringValue(ByteBuffer value) + { + nextValues[size++] = value; + } + + public void writeBoundKind(RangeTombstone.Bound.Kind kind) + { + nextKind = kind; + } + + private ByteBuffer[] getValues() + { + return Arrays.copyOfRange(nextValues, 0, size); + } + + private void open(RangeTombstone.Bound.Kind kind, DeletionTime deletion) + { + openBound = Slice.Bound.create(kind, getValues()); + openDeletion = deletion.takeAlias(); + } + + private void close(RangeTombstone.Bound.Kind kind, DeletionTime deletion) + { + assert deletion.equals(openDeletion) : "Expected " + openDeletion + " but was " + deletion; + Slice.Bound closeBound = Slice.Bound.create(kind, getValues()); + Slice slice = reversed + ? Slice.make(closeBound, openBound) + : Slice.make(openBound, closeBound); + addRangeTombstone(slice, openDeletion); + } + + public void writeBoundDeletion(DeletionTime deletion) + { + assert !nextKind.isBoundary(); + if (nextKind.isOpen(reversed)) + open(nextKind, deletion); + else + close(nextKind, deletion); + } + + public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion) + { + assert nextKind.isBoundary(); + DeletionTime closeTime = reversed ? startDeletion : endDeletion; + DeletionTime openTime = reversed ? endDeletion : startDeletion; + + close(nextKind.closeBoundOfBoundary(reversed), closeTime); + open(nextKind.openBoundOfBoundary(reversed), openTime); + } + + public void endOfMarker() + { + clear(); + } + + private void addRangeTombstone(Slice deletionSlice, DeletionTime dt) + { + AbstractPartitionData.this.addRangeTombstone(deletionSlice, dt); + } + + private void clear() + { + size = 0; + Arrays.fill(nextValues, null); + nextKind = null; + } + + public void reset() + { + openBound = null; + openDeletion = null; + clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java new file mode 100644 index 0000000..d615ea9 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AbstractUnfilteredPartitionIterator.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +public abstract class AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator +{ + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java new file mode 100644 index 0000000..68b3970 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; + +public class ArrayBackedCachedPartition extends ArrayBackedPartition implements CachedPartition +{ + private final int createdAtInSec; + + // Note that those fields are really immutable, but we can't easily pass their values to + // the ctor so they are not final. + private int cachedLiveRows; + private int rowsWithNonExpiringCells; + + private int nonTombstoneCellCount; + private int nonExpiringLiveCells; + + private ArrayBackedCachedPartition(CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime deletionTime, + PartitionColumns columns, + int initialRowCapacity, + boolean sortable, + int createdAtInSec) + { + super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable); + this.createdAtInSec = createdAtInSec; + } + + /** + * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator got gather in memory. + * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. + * @return the created partition. + */ + public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec) + { + return create(iterator, 4, nowInSec); + } + + /** + * Creates an {@code ArrayBackedCachedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator got gather in memory. + * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally + * correspond or be a good estimation of the number or rows in {@code iterator}. + * @param nowInSec the time of the creation in seconds. This is the time at which {@link #cachedLiveRows} applies. + * @return the created partition. + */ + public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec) + { + ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(iterator.metadata(), + iterator.partitionKey(), + iterator.partitionLevelDeletion(), + iterator.columns(), + initialRowCapacity, + iterator.isReverseOrder(), + nowInSec); + + partition.staticRow = iterator.staticRow().takeAlias(); + + Writer writer = partition.new Writer(nowInSec); + RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder()); + + copyAll(iterator, writer, markerCollector, partition); + + return partition; + } + + public Row lastRow() + { + if (rows == 0) + return null; + + return new InternalReusableRow().setTo(rows - 1); + } + + /** + * The number of rows that were live at the time the partition was cached. + * + * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need this. + * + * @return the number of rows in this partition that were live at the time the + * partition was cached (this can be different from the number of live rows now + * due to expiring cells). + */ + public int cachedLiveRows() + { + return cachedLiveRows; + } + + /** + * The number of rows in this cached partition that have at least one non-expiring + * non-deleted cell. + * + * Note that this is generally not a very meaningful number, but this is used by + * {@link DataLimits#hasEnoughLiveData} as an optimization. + * + * @return the number of row that have at least one non-expiring non-deleted cell. + */ + public int rowsWithNonExpiringCells() + { + return rowsWithNonExpiringCells; + } + + public int nonTombstoneCellCount() + { + return nonTombstoneCellCount; + } + + public int nonExpiringLiveCells() + { + return nonExpiringLiveCells; + } + + // Writers that collect the values for 'cachedLiveRows', 'rowsWithNonExpiringCells', 'nonTombstoneCellCount' + // and 'nonExpiringLiveCells'. + protected class Writer extends AbstractPartitionData.Writer + { + private final int nowInSec; + + private boolean hasLiveData; + private boolean hasNonExpiringCell; + + protected Writer(int nowInSec) + { + super(true); + this.nowInSec = nowInSec; + } + + @Override + public void writePartitionKeyLivenessInfo(LivenessInfo info) + { + super.writePartitionKeyLivenessInfo(info); + if (info.isLive(nowInSec)) + hasLiveData = true; + } + + @Override + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + super.writeCell(column, isCounter, value, info, path); + + if (info.isLive(nowInSec)) + { + hasLiveData = true; + if (!info.hasTTL()) + { + hasNonExpiringCell = true; + ++ArrayBackedCachedPartition.this.nonExpiringLiveCells; + } + } + + if (!info.hasLocalDeletionTime() || info.hasTTL()) + ++ArrayBackedCachedPartition.this.nonTombstoneCellCount; + } + + @Override + public void endOfRow() + { + super.endOfRow(); + if (hasLiveData) + ++ArrayBackedCachedPartition.this.cachedLiveRows; + if (hasNonExpiringCell) + ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells; + + hasLiveData = false; + hasNonExpiringCell = false; + } + } + + static class Serializer implements ISerializer<CachedPartition> + { + public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException + { + assert partition instanceof ArrayBackedCachedPartition; + ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition; + + out.writeInt(p.createdAtInSec); + try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) + { + UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rows); + } + } + + public CachedPartition deserialize(DataInput in) throws IOException + { + // Note that it would be slightly simpler to just do + // ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...)); + // However deserializing the header separatly is not a lot harder and allows us to: + // 1) get the capacity of the partition so we can size it properly directly + // 2) saves the creation of a temporary iterator: rows are directly written to the partition, which + // is slightly faster. + + int createdAtInSec = in.readInt(); + + UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL); + assert !h.isReversed && h.rowEstimate >= 0; + + ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.rowEstimate, false, createdAtInSec); + partition.staticRow = h.staticRow; + + Writer writer = partition.new Writer(createdAtInSec); + RangeTombstoneMarker.Writer markerWriter = partition.new RangeTombstoneCollector(false); + + UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(MessagingService.current_version, SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter); + return partition; + } + + public long serializedSize(CachedPartition partition, TypeSizes sizes) + { + assert partition instanceof ArrayBackedCachedPartition; + ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition; + + try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) + { + return sizes.sizeof(p.createdAtInSec) + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rows, sizes); + } + } + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java new file mode 100644 index 0000000..d7f3a88 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; + +public class ArrayBackedPartition extends AbstractPartitionData +{ + protected ArrayBackedPartition(CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime deletionTime, + PartitionColumns columns, + int initialRowCapacity, + boolean sortable) + { + super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable); + } + + /** + * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. + * @return the created partition. + */ + public static ArrayBackedPartition create(UnfilteredRowIterator iterator) + { + return create(iterator, 4); + } + + /** + * Creates an {@code ArrayBackedPartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. + * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally + * correspond or be a good estimation of the number or rows in {@code iterator}. + * @return the created partition. + */ + public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity) + { + ArrayBackedPartition partition = new ArrayBackedPartition(iterator.metadata(), + iterator.partitionKey(), + iterator.partitionLevelDeletion(), + iterator.columns(), + initialRowCapacity, + iterator.isReverseOrder()); + + partition.staticRow = iterator.staticRow().takeAlias(); + + Writer writer = partition.new Writer(true); + RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder()); + + copyAll(iterator, writer, markerCollector, partition); + + return partition; + } + + protected static void copyAll(UnfilteredRowIterator iterator, Writer writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition) + { + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + ((Row) unfiltered).copyTo(writer); + else + ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector); + } + + // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering + // order. So if we've just added them in reverse clustering order, reverse them. + if (iterator.isReverseOrder()) + partition.reverse(); + } +}
