http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceCommand.java b/src/java/org/apache/cassandra/db/RangeSliceCommand.java deleted file mode 100644 index 664eeee..0000000 --- a/src/java/org/apache/cassandra/db/RangeSliceCommand.java +++ /dev/null @@ -1,246 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import com.google.common.base.Objects; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.Schema; -import org.apache.cassandra.db.filter.ExtendedFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.pager.Pageable; - -public class RangeSliceCommand extends AbstractRangeCommand implements Pageable -{ - public static final RangeSliceCommandSerializer serializer = new RangeSliceCommandSerializer(); - - public final int maxResults; - public final boolean countCQL3Rows; - public final boolean isPaging; - - public RangeSliceCommand(String keyspace, - String columnFamily, - long timestamp, - IDiskAtomFilter predicate, - AbstractBounds<RowPosition> range, - int maxResults) - { - this(keyspace, columnFamily, timestamp, predicate, range, null, maxResults, false, false); - } - - public RangeSliceCommand(String keyspace, - String columnFamily, - long timestamp, - IDiskAtomFilter predicate, - AbstractBounds<RowPosition> range, - List<IndexExpression> row_filter, - int maxResults) - { - this(keyspace, columnFamily, timestamp, predicate, range, row_filter, maxResults, false, false); - } - - public RangeSliceCommand(String keyspace, - String columnFamily, - long timestamp, - IDiskAtomFilter predicate, - AbstractBounds<RowPosition> range, - List<IndexExpression> rowFilter, - int maxResults, - boolean countCQL3Rows, - boolean isPaging) - { - super(keyspace, columnFamily, timestamp, range, predicate, rowFilter); - this.maxResults = maxResults; - this.countCQL3Rows = countCQL3Rows; - this.isPaging = isPaging; - } - - public MessageOut<RangeSliceCommand> createMessage() - { - return new MessageOut<>(MessagingService.Verb.RANGE_SLICE, this, serializer); - } - - public AbstractRangeCommand forSubRange(AbstractBounds<RowPosition> subRange) - { - return new RangeSliceCommand(keyspace, - columnFamily, - timestamp, - predicate.cloneShallow(), - subRange, - rowFilter, - maxResults, - countCQL3Rows, - isPaging); - } - - public AbstractRangeCommand withUpdatedLimit(int newLimit) - { - return new RangeSliceCommand(keyspace, - columnFamily, - timestamp, - predicate.cloneShallow(), - keyRange, - rowFilter, - newLimit, - countCQL3Rows, - isPaging); - } - - public int limit() - { - return maxResults; - } - - public boolean countCQL3Rows() - { - return countCQL3Rows; - } - - public List<Row> executeLocally() - { - ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(columnFamily); - - ExtendedFilter exFilter = cfs.makeExtendedFilter(keyRange, predicate, rowFilter, maxResults, countCQL3Rows, isPaging, timestamp); - if (cfs.indexManager.hasIndexFor(rowFilter)) - return cfs.search(exFilter); - else - return cfs.getRangeSlice(exFilter); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("keyspace", keyspace) - .add("columnFamily", columnFamily) - .add("predicate", predicate) - .add("keyRange", keyRange) - .add("rowFilter", rowFilter) - .add("maxResults", maxResults) - .add("counterCQL3Rows", countCQL3Rows) - .add("timestamp", timestamp) - .toString(); - } -} - -class RangeSliceCommandSerializer implements IVersionedSerializer<RangeSliceCommand> -{ - public void serialize(RangeSliceCommand sliceCommand, DataOutputPlus out, int version) throws IOException - { - out.writeUTF(sliceCommand.keyspace); - out.writeUTF(sliceCommand.columnFamily); - out.writeLong(sliceCommand.timestamp); - - CFMetaData metadata = Schema.instance.getCFMetaData(sliceCommand.keyspace, sliceCommand.columnFamily); - - metadata.comparator.diskAtomFilterSerializer().serialize(sliceCommand.predicate, out, version); - - if (sliceCommand.rowFilter == null) - { - out.writeInt(0); - } - else - { - out.writeInt(sliceCommand.rowFilter.size()); - for (IndexExpression expr : sliceCommand.rowFilter) - { - expr.writeTo(out); - } - } - MessagingService.validatePartitioner(sliceCommand.keyRange); - AbstractBounds.rowPositionSerializer.serialize(sliceCommand.keyRange, out, version); - out.writeInt(sliceCommand.maxResults); - out.writeBoolean(sliceCommand.countCQL3Rows); - out.writeBoolean(sliceCommand.isPaging); - } - - public RangeSliceCommand deserialize(DataInput in, int version) throws IOException - { - String keyspace = in.readUTF(); - String columnFamily = in.readUTF(); - long timestamp = in.readLong(); - - CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, columnFamily); - if (metadata == null) - { - String message = String.format("Got range slice command for nonexistent table %s.%s. If the table was just " + - "created, this is likely due to the schema not being fully propagated. Please wait for schema " + - "agreement on table creation." , keyspace, columnFamily); - throw new UnknownColumnFamilyException(message, null); - } - - IDiskAtomFilter predicate = metadata.comparator.diskAtomFilterSerializer().deserialize(in, version); - - List<IndexExpression> rowFilter; - int filterCount = in.readInt(); - rowFilter = new ArrayList<>(filterCount); - for (int i = 0; i < filterCount; i++) - { - rowFilter.add(IndexExpression.readFrom(in)); - } - AbstractBounds<RowPosition> range = AbstractBounds.rowPositionSerializer.deserialize(in, MessagingService.globalPartitioner(), version); - - int maxResults = in.readInt(); - boolean countCQL3Rows = in.readBoolean(); - boolean isPaging = in.readBoolean(); - return new RangeSliceCommand(keyspace, columnFamily, timestamp, predicate, range, rowFilter, maxResults, countCQL3Rows, isPaging); - } - - public long serializedSize(RangeSliceCommand rsc, int version) - { - long size = TypeSizes.NATIVE.sizeof(rsc.keyspace); - size += TypeSizes.NATIVE.sizeof(rsc.columnFamily); - size += TypeSizes.NATIVE.sizeof(rsc.timestamp); - - CFMetaData metadata = Schema.instance.getCFMetaData(rsc.keyspace, rsc.columnFamily); - - IDiskAtomFilter filter = rsc.predicate; - - size += metadata.comparator.diskAtomFilterSerializer().serializedSize(filter, version); - - if (rsc.rowFilter == null) - { - size += TypeSizes.NATIVE.sizeof(0); - } - else - { - size += TypeSizes.NATIVE.sizeof(rsc.rowFilter.size()); - for (IndexExpression expr : rsc.rowFilter) - { - size += TypeSizes.NATIVE.sizeofWithShortLength(expr.column); - size += TypeSizes.NATIVE.sizeof(expr.operator.ordinal()); - size += TypeSizes.NATIVE.sizeofWithShortLength(expr.value); - } - } - size += AbstractBounds.rowPositionSerializer.serializedSize(rsc.keyRange, version); - size += TypeSizes.NATIVE.sizeof(rsc.maxResults); - size += TypeSizes.NATIVE.sizeof(rsc.countCQL3Rows); - size += TypeSizes.NATIVE.sizeof(rsc.isPaging); - return size; - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeSliceReply.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeSliceReply.java b/src/java/org/apache/cassandra/db/RangeSliceReply.java deleted file mode 100644 index ed1f523..0000000 --- a/src/java/org/apache/cassandra/db/RangeSliceReply.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db; - -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.lang3.StringUtils; - -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.FastByteArrayInputStream; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; - -public class RangeSliceReply -{ - public static final RangeSliceReplySerializer serializer = new RangeSliceReplySerializer(); - - public final List<Row> rows; - - public RangeSliceReply(List<Row> rows) - { - this.rows = rows; - } - - public MessageOut<RangeSliceReply> createMessage() - { - return new MessageOut<RangeSliceReply>(MessagingService.Verb.REQUEST_RESPONSE, this, serializer); - } - - @Override - public String toString() - { - return "RangeSliceReply{" + - "rows=" + StringUtils.join(rows, ",") + - '}'; - } - - public static RangeSliceReply read(byte[] body, int version) throws IOException - { - try (DataInputStream dis = new DataInputStream(new FastByteArrayInputStream(body))) - { - return serializer.deserialize(dis, version); - } - } - - private static class RangeSliceReplySerializer implements IVersionedSerializer<RangeSliceReply> - { - public void serialize(RangeSliceReply rsr, DataOutputPlus out, int version) throws IOException - { - out.writeInt(rsr.rows.size()); - for (Row row : rsr.rows) - Row.serializer.serialize(row, out, version); - } - - public RangeSliceReply deserialize(DataInput in, int version) throws IOException - { - int rowCount = in.readInt(); - List<Row> rows = new ArrayList<Row>(rowCount); - for (int i = 0; i < rowCount; i++) - rows.add(Row.serializer.deserialize(in, version)); - return new RangeSliceReply(rows); - } - - public long serializedSize(RangeSliceReply rsr, int version) - { - int size = TypeSizes.NATIVE.sizeof(rsr.rows.size()); - for (Row row : rsr.rows) - size += Row.serializer.serializedSize(row, version); - return size; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index da483fc..3373afa 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -19,12 +19,12 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.*; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.composites.CType; -import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.ISSTableSerializer; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataOutputBuffer; @@ -32,350 +32,142 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.Interval; -public class RangeTombstone extends Interval<Composite, DeletionTime> implements OnDiskAtom +/** + * A range tombstone is a tombstone that covers a slice/range of rows. + * <p> + * Note that in most of the storage engine, a range tombstone is actually represented by its separated + * opening and closing bound, see {@link RangeTombstoneMarker}. So in practice, this is only used when + * full partitions are materialized in memory in a {@code Partition} object, and more precisely through + * the use of a {@code RangeTombstoneList} in a {@code DeletionInfo} object. + */ +public class RangeTombstone { - public RangeTombstone(Composite start, Composite stop, long markedForDeleteAt, int localDeletionTime) - { - this(start, stop, new DeletionTime(markedForDeleteAt, localDeletionTime)); - } - - public RangeTombstone(Composite start, Composite stop, DeletionTime delTime) - { - super(start, stop, delTime); - } - - public Composite name() - { - return min; - } + private final Slice slice; + private final DeletionTime deletion; - public int getLocalDeletionTime() + public RangeTombstone(Slice slice, DeletionTime deletion) { - return data.localDeletionTime; + this.slice = slice; + this.deletion = deletion.takeAlias(); } - public long timestamp() + /** + * The slice of rows that is deleted by this range tombstone. + * + * @return the slice of rows that is deleted by this range tombstone. + */ + public Slice deletedSlice() { - return data.markedForDeleteAt; + return slice; } - public void validateFields(CFMetaData metadata) throws MarshalException + /** + * The deletion time for this (range) tombstone. + * + * @return the deletion time for this range tombstone. + */ + public DeletionTime deletionTime() { - metadata.comparator.validate(min); - metadata.comparator.validate(max); + return deletion; } - public void updateDigest(MessageDigest digest) + @Override + public boolean equals(Object other) { - digest.update(min.toByteBuffer().duplicate()); - digest.update(max.toByteBuffer().duplicate()); - - try (DataOutputBuffer buffer = new DataOutputBuffer()) - { - buffer.writeLong(data.markedForDeleteAt); - digest.update(buffer.getData(), 0, buffer.getLength()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - /** - * This tombstone supersedes another one if it is more recent and cover a - * bigger range than rt. - */ - public boolean supersedes(RangeTombstone rt, Comparator<Composite> comparator) - { - if (rt.data.markedForDeleteAt > data.markedForDeleteAt) + if(!(other instanceof RangeTombstone)) return false; - return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0; + RangeTombstone that = (RangeTombstone)other; + return this.deletedSlice().equals(that.deletedSlice()) + && this.deletionTime().equals(that.deletionTime()); } - public boolean includes(Comparator<Composite> comparator, Composite name) + @Override + public int hashCode() { - return comparator.compare(name, min) >= 0 && comparator.compare(name, max) <= 0; + return Objects.hash(deletedSlice(), deletionTime()); } /** - * Tracks opened RangeTombstones when iterating over a partition. + * The bound of a range tombstone. * <p> - * This tracker must be provided all the atoms of a given partition in - * order (to the {@code update} method). Given this, it keeps enough - * information to be able to decide if one of an atom is deleted (shadowed) - * by a previously open RT. One the tracker can prove a given range - * tombstone cannot be useful anymore (that is, as soon as we've seen an - * atom that is after the end of that RT), it discards this RT. In other - * words, the maximum memory used by this object should be proportional to - * the maximum number of RT that can be simultaneously open (and this - * should fairly low in practice). + * This is the same than for a slice but it includes "boundaries" between ranges. A boundary simply condensed + * a close and an opening "bound" into a single object. There is 2 main reasons for these "shortcut" boundaries: + * 1) When merging multiple iterators having range tombstones (that are represented by their start and end markers), + * we need to know when a range is close on an iterator, if it is reopened right away. Otherwise, we cannot + * easily produce the markers on the merged iterators within risking to fail the sorting guarantees of an + * iterator. See this comment for more details: https://goo.gl/yyB5mR. + * 2) This saves some storage space. */ - public static class Tracker + public static class Bound extends Slice.Bound { - private final Comparator<Composite> comparator; - - // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a - // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because - // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we - // never have to test the RTs start since it's always assumed to be less than what we have. - // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and - // will be ignored by writeOpenedMarker. - private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>(); - - // Total number of atoms written by writeOpenedMarker(). - private int atomCount; + public static final Serializer serializer = new Serializer(); - /** - * Creates a new tracker given the table comparator. - * - * @param comparator the comparator for the table this will track atoms - * for. The tracker assumes that atoms will be later provided to the - * tracker in {@code comparator} order. - */ - public Tracker(Comparator<Composite> comparator) + public Bound(Kind kind, ByteBuffer[] values) { - this.comparator = comparator; + super(kind, values); } - /** - * Computes the RangeTombstone that are needed at the beginning of an index - * block starting with {@code firstColumn}. - * - * @return the total serialized size of said tombstones and write them to - * {@code out} it if isn't null. - */ - public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutputPlus out, OnDiskAtom.Serializer atomSerializer) throws IOException + public static RangeTombstone.Bound inclusiveOpen(boolean reversed, ByteBuffer[] boundValues) { - long size = 0; - if (openedTombstones.isEmpty()) - return size; - - /* - * Compute the markers that needs to be written at the beginning of - * this block. We need to write one if it is the more recent - * (opened) tombstone for at least some part of its range. - */ - List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>(); - outer: - for (RangeTombstone tombstone : openedTombstones) - { - // If the first column is outside the range, skip it (in case update() hasn't been called yet) - if (comparator.compare(firstColumn.name(), tombstone.max) > 0) - continue; - - if (tombstone instanceof ExpiredRangeTombstone) - continue; - - RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data); - - Iterator<RangeTombstone> iter = toWrite.iterator(); - while (iter.hasNext()) - { - RangeTombstone other = iter.next(); - if (other.supersedes(updated, comparator)) - break outer; - if (updated.supersedes(other, comparator)) - iter.remove(); - } - toWrite.add(tombstone); - } - - for (RangeTombstone tombstone : toWrite) - { - size += atomSerializer.serializedSizeForSSTable(tombstone); - atomCount++; - if (out != null) - atomSerializer.serializeForSSTable(tombstone, out); - } - return size; - } - - /** - * The total number of atoms written by calls to the method {@link #writeOpenedMarker}. - */ - public int writtenAtom() - { - return atomCount; - } - - /** - * Update this tracker given an {@code atom}. - * <p> - * This method first test if some range tombstone can be discarded due - * to the knowledge of that new atom. Then, if it's a range tombstone, - * it adds it to the tracker. - * <p> - * Note that this method should be called on *every* atom of a partition for - * the tracker to work as efficiently as possible (#9486). - */ - public void update(OnDiskAtom atom, boolean isExpired) - { - // Get rid of now useless RTs - ListIterator<RangeTombstone> iterator = openedTombstones.listIterator(); - while (iterator.hasNext()) - { - // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future - // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too - // since maxOrderingSet is sorted by end bounds - RangeTombstone t = iterator.next(); - if (comparator.compare(atom.name(), t.max) > 0) - { - iterator.remove(); - } - else - { - // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just - // returned, so rewind the iterator. - iterator.previous(); - break; - } - } - - // If it's a RT, adds it. - if (atom instanceof RangeTombstone) - { - RangeTombstone toAdd = (RangeTombstone)atom; - if (isExpired) - toAdd = new ExpiredRangeTombstone(toAdd); - - // We want to maintain openedTombstones in end bounds order so we find where to insert the new element - // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed - // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will - // at least happend for start-of-index-block repeated range tombstones). - while (iterator.hasNext()) - { - RangeTombstone existing = iterator.next(); - int cmp = comparator.compare(toAdd.max, existing.max); - if (cmp > 0) - { - // the new one covers more than the existing one. If the new one happens to also supersedes - // the existing one, remove the existing one. In any case, we're not done yet. - if (toAdd.data.supersedes(existing.data)) - iterator.remove(); - } - else - { - // the new one is included in the existing one. If the new one supersedes the existing one, - // then we add the new one (and if the new one ends like the existing one, we can actually remove - // the existing one), otherwise we can actually ignore it. In any case, we're done. - if (toAdd.data.supersedes(existing.data)) - { - if (cmp == 0) - iterator.set(toAdd); - else - insertBefore(toAdd, iterator); - } - return; - } - } - // If we reach here, either we had no tombstones and the new one ends after all existing ones. - iterator.add(toAdd); - } + return new Bound(reversed ? Kind.INCL_END_BOUND : Kind.INCL_START_BOUND, boundValues); } - /** - * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}. - * <p> - * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that - * {@code iterator.hasPrevious() == true}. - */ - private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator) + public static RangeTombstone.Bound exclusiveOpen(boolean reversed, ByteBuffer[] boundValues) { - assert iterator.hasPrevious(); - iterator.previous(); - iterator.add(tombstone); - iterator.next(); + return new Bound(reversed ? Kind.EXCL_END_BOUND : Kind.EXCL_START_BOUND, boundValues); } - /** - * Tests if the provided column is deleted by one of the tombstone - * tracked by this tracker. - * <p> - * This method should be called on columns in the same order than for the update() - * method. Note that this method does not update the tracker so the update() method - * should still be called on {@code column} (it doesn't matter if update is called - * before or after this call). - */ - public boolean isDeleted(Cell cell) + public static RangeTombstone.Bound inclusiveClose(boolean reversed, ByteBuffer[] boundValues) { - // We know every tombstone kept are "open", start before the column. So the - // column is deleted if any of the tracked tombstone ends after the column - // (this will be the case of every RT if update() has been called before this - // method, but we might have a few RT to skip otherwise) and the RT deletion is - // actually more recent than the column timestamp. - for (RangeTombstone tombstone : openedTombstones) - { - if (comparator.compare(cell.name(), tombstone.max) <= 0 - && tombstone.timestamp() >= cell.timestamp()) - return true; - } - return false; + return new Bound(reversed ? Kind.INCL_START_BOUND : Kind.INCL_END_BOUND, boundValues); } - /** - * The tracker needs to track expired range tombstone but keep tracks that they are - * expired, so this is what this class is used for. - */ - private static class ExpiredRangeTombstone extends RangeTombstone + public static RangeTombstone.Bound exclusiveClose(boolean reversed, ByteBuffer[] boundValues) { - private ExpiredRangeTombstone(RangeTombstone tombstone) - { - super(tombstone.min, tombstone.max, tombstone.data); - } + return new Bound(reversed ? Kind.EXCL_START_BOUND : Kind.EXCL_END_BOUND, boundValues); } - } - - public static class Serializer implements ISSTableSerializer<RangeTombstone> - { - private final CType type; - public Serializer(CType type) + public static RangeTombstone.Bound inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues) { - this.type = type; + return new Bound(reversed ? Kind.EXCL_END_INCL_START_BOUNDARY : Kind.INCL_END_EXCL_START_BOUNDARY, boundValues); } - public void serializeForSSTable(RangeTombstone t, DataOutputPlus out) throws IOException + public static RangeTombstone.Bound exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues) { - type.serializer().serialize(t.min, out); - out.writeByte(ColumnSerializer.RANGE_TOMBSTONE_MASK); - type.serializer().serialize(t.max, out); - DeletionTime.serializer.serialize(t.data, out); + return new Bound(reversed ? Kind.INCL_END_EXCL_START_BOUNDARY : Kind.EXCL_END_INCL_START_BOUNDARY, boundValues); } - public RangeTombstone deserializeFromSSTable(DataInput in, Version version) throws IOException + @Override + public Bound withNewKind(Kind kind) { - Composite min = type.serializer().deserialize(in); - - int b = in.readUnsignedByte(); - assert (b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0; - return deserializeBody(in, min, version); + return new Bound(kind, values); } - public RangeTombstone deserializeBody(DataInput in, Composite min, Version version) throws IOException + public static class Serializer { - Composite max = type.serializer().deserialize(in); - DeletionTime dt = DeletionTime.serializer.deserialize(in); - // If the max equals the min.end(), we can avoid keeping an extra ByteBuffer in memory by using - // min.end() instead of max - Composite minEnd = min.end(); - max = minEnd.equals(max) ? minEnd : max; - return new RangeTombstone(min, max, dt); - } + public void serialize(RangeTombstone.Bound bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException + { + out.writeByte(bound.kind().ordinal()); + out.writeShort(bound.size()); + ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types); + } - public void skipBody(DataInput in, Version version) throws IOException - { - type.serializer().skip(in); - DeletionTime.serializer.skip(in); - } + public long serializedSize(RangeTombstone.Bound bound, int version, List<AbstractType<?>> types, TypeSizes sizes) + { + return 1 // kind ordinal + + sizes.sizeof((short)bound.size()) + + ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(bound, version, types, sizes); + } - public long serializedSizeForSSTable(RangeTombstone t) - { - TypeSizes typeSizes = TypeSizes.NATIVE; - return type.serializer().serializedSize(t.min, typeSizes) - + 1 // serialization flag - + type.serializer().serializedSize(t.max, typeSizes) - + DeletionTime.serializer.serializedSize(t.data, typeSizes); + public Kind deserialize(DataInput in, int version, List<AbstractType<?>> types, Writer writer) throws IOException + { + Kind kind = Kind.values()[in.readByte()]; + writer.writeBoundKind(kind); + int size = in.readUnsignedShort(); + ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, size, version, types, writer); + return kind; + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RangeTombstoneList.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstoneList.java b/src/java/org/apache/cassandra/db/RangeTombstoneList.java index 37f1ef4..0c27bc4 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstoneList.java +++ b/src/java/org/apache/cassandra/db/RangeTombstoneList.java @@ -20,9 +20,7 @@ package org.apache.cassandra.db; import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; import java.util.Arrays; -import java.util.Comparator; import java.util.Iterator; import com.google.common.collect.AbstractIterator; @@ -31,12 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.cache.IMeasurableMemory; -import org.apache.cassandra.db.composites.CType; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.memory.AbstractAllocator; @@ -62,19 +57,19 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable private static long EMPTY_SIZE = ObjectSizes.measure(new RangeTombstoneList(null, 0)); - private final Comparator<Composite> comparator; + private final ClusteringComparator comparator; // Note: we don't want to use a List for the markedAts and delTimes to avoid boxing. We could // use a List for starts and ends, but having arrays everywhere is almost simpler. - private Composite[] starts; - private Composite[] ends; + private Slice.Bound[] starts; + private Slice.Bound[] ends; private long[] markedAts; private int[] delTimes; private long boundaryHeapSize; private int size; - private RangeTombstoneList(Comparator<Composite> comparator, Composite[] starts, Composite[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size) + private RangeTombstoneList(ClusteringComparator comparator, Slice.Bound[] starts, Slice.Bound[] ends, long[] markedAts, int[] delTimes, long boundaryHeapSize, int size) { assert starts.length == ends.length && starts.length == markedAts.length && starts.length == delTimes.length; this.comparator = comparator; @@ -86,9 +81,9 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable this.boundaryHeapSize = boundaryHeapSize; } - public RangeTombstoneList(Comparator<Composite> comparator, int capacity) + public RangeTombstoneList(ClusteringComparator comparator, int capacity) { - this(comparator, new Composite[capacity], new Composite[capacity], new long[capacity], new int[capacity], 0, 0); + this(comparator, new Slice.Bound[capacity], new Slice.Bound[capacity], new long[capacity], new int[capacity], 0, 0); } public boolean isEmpty() @@ -101,7 +96,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return size; } - public Comparator<Composite> comparator() + public ClusteringComparator comparator() { return comparator; } @@ -119,27 +114,36 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable public RangeTombstoneList copy(AbstractAllocator allocator) { RangeTombstoneList copy = new RangeTombstoneList(comparator, - new Composite[size], - new Composite[size], - Arrays.copyOf(markedAts, size), - Arrays.copyOf(delTimes, size), - boundaryHeapSize, size); + new Slice.Bound[size], + new Slice.Bound[size], + Arrays.copyOf(markedAts, size), + Arrays.copyOf(delTimes, size), + boundaryHeapSize, size); for (int i = 0; i < size; i++) { - assert !(starts[i] instanceof AbstractNativeCell || ends[i] instanceof AbstractNativeCell); //this should never happen - - copy.starts[i] = starts[i].copy(null, allocator); - copy.ends[i] = ends[i].copy(null, allocator); + copy.starts[i] = clone(starts[i], allocator); + copy.ends[i] = clone(ends[i], allocator); } return copy; } + private static Slice.Bound clone(Slice.Bound bound, AbstractAllocator allocator) + { + ByteBuffer[] values = new ByteBuffer[bound.size()]; + for (int i = 0; i < values.length; i++) + values[i] = allocator.clone(bound.get(i)); + return new Slice.Bound(bound.kind(), values); + } + public void add(RangeTombstone tombstone) { - add(tombstone.min, tombstone.max, tombstone.data.markedForDeleteAt, tombstone.data.localDeletionTime); + add(tombstone.deletedSlice().start(), + tombstone.deletedSlice().end(), + tombstone.deletionTime().markedForDeleteAt(), + tombstone.deletionTime().localDeletionTime()); } /** @@ -148,7 +152,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable * This method will be faster if the new tombstone sort after all the currently existing ones (this is a common use case), * but it doesn't assume it. */ - public void add(Composite start, Composite end, long markedAt, int delTime) + public void add(Slice.Bound start, Slice.Bound end, long markedAt, int delTime) { if (isEmpty()) { @@ -215,7 +219,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable int j = 0; while (i < size && j < tombstones.size) { - if (comparator.compare(tombstones.starts[j], ends[i]) <= 0) + if (comparator.compare(tombstones.starts[j], ends[i]) < 0) { insertFrom(i, tombstones.starts[j], tombstones.ends[j], tombstones.markedAts[j], tombstones.delTimes[j]); j++; @@ -235,34 +239,26 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable * Returns whether the given name/timestamp pair is deleted by one of the tombstone * of this RangeTombstoneList. */ - public boolean isDeleted(Cell cell) + public boolean isDeleted(Clustering clustering, Cell cell) { - int idx = searchInternal(cell.name(), 0); + int idx = searchInternal(clustering, 0, size); // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - return idx >= 0 && (cell instanceof CounterCell || markedAts[idx] >= cell.timestamp()); - } - - /** - * Returns a new {@link InOrderTester}. - */ - InOrderTester inOrderTester() - { - return new InOrderTester(); + return idx >= 0 && (cell.isCounterCell() || markedAts[idx] >= cell.livenessInfo().timestamp()); } /** * Returns the DeletionTime for the tombstone overlapping {@code name} (there can't be more than one), * or null if {@code name} is not covered by any tombstone. */ - public DeletionTime searchDeletionTime(Composite name) + public DeletionTime searchDeletionTime(Clustering name) { - int idx = searchInternal(name, 0); - return idx < 0 ? null : new DeletionTime(markedAts[idx], delTimes[idx]); + int idx = searchInternal(name, 0, size); + return idx < 0 ? null : new SimpleDeletionTime(markedAts[idx], delTimes[idx]); } - public RangeTombstone search(Composite name) + public RangeTombstone search(Clustering name) { - int idx = searchInternal(name, 0); + int idx = searchInternal(name, 0, size); return idx < 0 ? null : rangeTombstone(idx); } @@ -270,20 +266,15 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable * Return is the index of the range covering name if name is covered. If the return idx is negative, * no range cover name and -idx-1 is the index of the first range whose start is greater than name. */ - private int searchInternal(Composite name, int startIdx) + private int searchInternal(ClusteringPrefix name, int startIdx, int endIdx) { if (isEmpty()) return -1; - int pos = Arrays.binarySearch(starts, startIdx, size, name, comparator); + int pos = Arrays.binarySearch(starts, startIdx, endIdx, name, comparator); if (pos >= 0) { - // We're exactly on an interval start. The one subtility is that we need to check if - // the previous is not equal to us and doesn't have a higher marked at - if (pos > 0 && comparator.compare(name, ends[pos-1]) == 0 && markedAts[pos-1] > markedAts[pos]) - return pos-1; - else - return pos; + return pos; } else { @@ -308,93 +299,94 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return dataSize; } - public long minMarkedAt() + public void updateAllTimestamp(long timestamp) { - long min = Long.MAX_VALUE; for (int i = 0; i < size; i++) - min = Math.min(min, markedAts[i]); - return min; + markedAts[i] = timestamp; } - public long maxMarkedAt() + private RangeTombstone rangeTombstone(int idx) { - long max = Long.MIN_VALUE; - for (int i = 0; i < size; i++) - max = Math.max(max, markedAts[i]); - return max; + return new RangeTombstone(Slice.make(starts[idx], ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); } - public void updateAllTimestamp(long timestamp) + private RangeTombstone rangeTombstoneWithNewStart(int idx, Slice.Bound newStart) { - for (int i = 0; i < size; i++) - markedAts[i] = timestamp; + return new RangeTombstone(Slice.make(newStart, ends[idx]), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); } - /** - * Removes all range tombstones whose local deletion time is older than gcBefore. - */ - public void purge(int gcBefore) + private RangeTombstone rangeTombstoneWithNewEnd(int idx, Slice.Bound newEnd) { - int j = 0; - for (int i = 0; i < size; i++) - { - if (delTimes[i] >= gcBefore) - setInternal(j++, starts[i], ends[i], markedAts[i], delTimes[i]); - } - size = j; + return new RangeTombstone(Slice.make(starts[idx], newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); } - /** - * Returns whether {@code purge(gcBefore)} would remove something or not. - */ - public boolean hasPurgeableTombstones(int gcBefore) + private RangeTombstone rangeTombstoneWithNewBounds(int idx, Slice.Bound newStart, Slice.Bound newEnd) { - for (int i = 0; i < size; i++) - { - if (delTimes[i] < gcBefore) - return true; - } - return false; + return new RangeTombstone(Slice.make(newStart, newEnd), new SimpleDeletionTime(markedAts[idx], delTimes[idx])); } - private RangeTombstone rangeTombstone(int idx) + public Iterator<RangeTombstone> iterator() { - return new RangeTombstone(starts[idx], ends[idx], markedAts[idx], delTimes[idx]); + return iterator(false); } - public Iterator<RangeTombstone> iterator() + public Iterator<RangeTombstone> iterator(boolean reversed) { - return new AbstractIterator<RangeTombstone>() - { - private int idx; + return reversed + ? new AbstractIterator<RangeTombstone>() + { + private int idx = size - 1; - protected RangeTombstone computeNext() - { - if (idx >= size) - return endOfData(); + protected RangeTombstone computeNext() + { + if (idx < 0) + return endOfData(); - return rangeTombstone(idx++); - } - }; + return rangeTombstone(idx--); + } + } + : new AbstractIterator<RangeTombstone>() + { + private int idx; + + protected RangeTombstone computeNext() + { + if (idx >= size) + return endOfData(); + + return rangeTombstone(idx++); + } + }; } - public Iterator<RangeTombstone> iterator(Composite from, Composite till) + public Iterator<RangeTombstone> iterator(final Slice slice, boolean reversed) { - int startIdx = from.isEmpty() ? 0 : searchInternal(from, 0); + return reversed ? reverseIterator(slice) : forwardIterator(slice); + } + + private Iterator<RangeTombstone> forwardIterator(final Slice slice) + { + int startIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, size); final int start = startIdx < 0 ? -startIdx-1 : startIdx; if (start >= size) return Iterators.<RangeTombstone>emptyIterator(); - int finishIdx = till.isEmpty() ? size : searchInternal(till, start); - // if stopIdx is the first range after 'till' we care only until the previous range + int finishIdx = slice.end() == Slice.Bound.TOP ? size - 1 : searchInternal(slice.end(), start, size); + // if stopIdx is the first range after 'slice.end()' we care only until the previous range final int finish = finishIdx < 0 ? -finishIdx-2 : finishIdx; - // Note: the following is true because we know 'from' is before 'till' in sorted order. if (start > finish) return Iterators.<RangeTombstone>emptyIterator(); - else if (start == finish) - return Iterators.<RangeTombstone>singletonIterator(rangeTombstone(start)); + + if (start == finish) + { + // We want to make sure the range are stricly included within the queried slice as this + // make it easier to combine things when iterating over successive slices. + Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start]; + Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start]; + return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e)); + } return new AbstractIterator<RangeTombstone>() { @@ -405,76 +397,63 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable if (idx >= size || idx > finish) return endOfData(); + // We want to make sure the range are stricly included within the queried slice as this + // make it easier to combine things when iterating over successive slices. This means that + // for the first and last range we might have to "cut" the range returned. + if (idx == start && comparator.compare(starts[idx], slice.start()) < 0) + return rangeTombstoneWithNewStart(idx++, slice.start()); + if (idx == finish && comparator.compare(slice.end(), ends[idx]) < 0) + return rangeTombstoneWithNewEnd(idx++, slice.end()); return rangeTombstone(idx++); } }; } - /** - * Evaluates a diff between superset (known to be all merged tombstones) and this list for read repair - * - * @return null if there is no difference - */ - public RangeTombstoneList diff(RangeTombstoneList superset) + private Iterator<RangeTombstone> reverseIterator(final Slice slice) { - if (isEmpty()) - return superset; + int startIdx = slice.end() == Slice.Bound.TOP ? 0 : searchInternal(slice.end(), 0, size); + // if startIdx is the first range after 'slice.end()' we care only until the previous range + final int start = startIdx < 0 ? -startIdx-2 : startIdx; - RangeTombstoneList diff = null; - - int j = 0; // index to iterate through our own list - for (int i = 0; i < superset.size; i++) - { - // we can assume that this list is a subset of the superset list - while (j < size && comparator.compare(starts[j], superset.starts[i]) < 0) - j++; + if (start >= size) + return Iterators.<RangeTombstone>emptyIterator(); - if (j >= size) - { - // we're at the end of our own list, add the remainder of the superset to the diff - if (i < superset.size) - { - if (diff == null) - diff = new RangeTombstoneList(comparator, superset.size - i); + int finishIdx = slice.start() == Slice.Bound.BOTTOM ? 0 : searchInternal(slice.start(), 0, start); + // if stopIdx is the first range after 'slice.end()' we care only until the previous range + final int finish = finishIdx < 0 ? -finishIdx-1 : finishIdx; - for(int k = i; k < superset.size; k++) - diff.add(superset.starts[k], superset.ends[k], superset.markedAts[k], superset.delTimes[k]); - } - return diff; - } + if (start < finish) + return Iterators.<RangeTombstone>emptyIterator(); - // we don't care about local deletion time here, because it doesn't matter for read repair - if (!starts[j].equals(superset.starts[i]) - || !ends[j].equals(superset.ends[i]) - || markedAts[j] != superset.markedAts[i]) - { - if (diff == null) - diff = new RangeTombstoneList(comparator, Math.min(8, superset.size - i)); - diff.add(superset.starts[i], superset.ends[i], superset.markedAts[i], superset.delTimes[i]); - } + if (start == finish) + { + // We want to make sure the range are stricly included within the queried slice as this + // make it easier to combine things when iterator over successive slices. + Slice.Bound s = comparator.compare(starts[start], slice.start()) < 0 ? slice.start() : starts[start]; + Slice.Bound e = comparator.compare(slice.end(), ends[start]) < 0 ? slice.end() : ends[start]; + return Iterators.<RangeTombstone>singletonIterator(rangeTombstoneWithNewBounds(start, s, e)); } - return diff; - } - - /** - * Calculates digest for triggering read repair on mismatch - */ - public void updateDigest(MessageDigest digest) - { - ByteBuffer longBuffer = ByteBuffer.allocate(8); - for (int i = 0; i < size; i++) + return new AbstractIterator<RangeTombstone>() { - for (int j = 0; j < starts[i].size(); j++) - digest.update(starts[i].get(j).duplicate()); - for (int j = 0; j < ends[i].size(); j++) - digest.update(ends[i].get(j).duplicate()); + private int idx = start; - longBuffer.putLong(0, markedAts[i]); - digest.update(longBuffer.array(), 0, 8); - } - } + protected RangeTombstone computeNext() + { + if (idx < 0 || idx < finish) + return endOfData(); + // We want to make sure the range are stricly included within the queried slice as this + // make it easier to combine things when iterator over successive slices. This means that + // for the first and last range we might have to "cut" the range returned. + if (idx == start && comparator.compare(slice.end(), ends[idx]) < 0) + return rangeTombstoneWithNewEnd(idx--, slice.end()); + if (idx == finish && comparator.compare(starts[idx], slice.start()) < 0) + return rangeTombstoneWithNewStart(idx--, slice.start()); + return rangeTombstone(idx++); + } + }; + } @Override public boolean equals(Object o) @@ -525,51 +504,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable /* * Inserts a new element starting at index i. This method assumes that: - * ends[i-1] <= start <= ends[i] + * ends[i-1] < start < ends[i] + * (note that we cannot have start == end since both will at least have a different bound "kind") * * A RangeTombstoneList is a list of range [s_0, e_0]...[s_n, e_n] such that: * - s_i <= e_i - * - e_i <= s_i+1 - * - if s_i == e_i and e_i == s_i+1 then s_i+1 < e_i+1 - * Basically, range are non overlapping except for their bound and in order. And while - * we allow ranges with the same value for the start and end, we don't allow repeating - * such range (so we can't have [0, 0][0, 0] even though it would respect the first 2 - * conditions). - * + * - e_i < s_i+1 + * Basically, range are non overlapping and in order. */ - private void insertFrom(int i, Composite start, Composite end, long markedAt, int delTime) + private void insertFrom(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime) { while (i < size) { - assert i == 0 || comparator.compare(ends[i-1], start) <= 0; + assert start.isStart() && end.isEnd(); + assert i == 0 || comparator.compare(ends[i-1], start) < 0; + assert comparator.compare(start, ends[i]) < 0; - int c = comparator.compare(start, ends[i]); - assert c <= 0; - if (c == 0) - { - // If start == ends[i], then we can insert from the next one (basically the new element - // really start at the next element), except for the case where starts[i] == ends[i]. - // In this latter case, if we were to move to next element, we could end up with ...[x, x][x, x]... - if (comparator.compare(starts[i], ends[i]) == 0) - { - // The current element cover a single value which is equal to the start of the inserted - // element. If the inserted element overwrites the current one, just remove the current - // (it's included in what we insert) and proceed with the insert. - if (markedAt > markedAts[i]) - { - removeInternal(i); - continue; - } - - // Otherwise (the current singleton interval override the new one), we want to leave the - // current element and move to the next, unless start == end since that means the new element - // is in fact fully covered by the current one (so we're done) - if (comparator.compare(start, end) == 0) - return; - } - i++; - continue; - } + if (Slice.isEmpty(comparator, start, end)) + return; // Do we overwrite the current element? if (markedAt > markedAts[i]) @@ -579,26 +531,24 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable // First deal with what might come before the newly added one. if (comparator.compare(starts[i], start) < 0) { - addInternal(i, starts[i], start, markedAts[i], delTimes[i]); - i++; - // We don't need to do the following line, but in spirit that's what we want to do - // setInternal(i, start, ends[i], markedAts, delTime]) + Slice.Bound newEnd = start.invert(); + if (!Slice.isEmpty(comparator, starts[i], newEnd)) + { + addInternal(i, starts[i], start.invert(), markedAts[i], delTimes[i]); + i++; + setInternal(i, start, ends[i], markedAts[i], delTimes[i]); + } } // now, start <= starts[i] - // Does the new element stops before/at the current one, + // Does the new element stops before the current one, int endCmp = comparator.compare(end, starts[i]); - if (endCmp <= 0) + if (endCmp < 0) { - // Here start <= starts[i] and end <= starts[i] - // This means the current element is before the current one. However, one special - // case is if end == starts[i] and starts[i] == ends[i]. In that case, - // the new element entirely overwrite the current one and we can just overwrite - if (endCmp == 0 && comparator.compare(starts[i], ends[i]) == 0) - setInternal(i, start, end, markedAt, delTime); - else - addInternal(i, start, end, markedAt, delTime); + // Here start <= starts[i] and end < starts[i] + // This means the current element is before the current one. + addInternal(i, start, end, markedAt, delTime); return; } @@ -617,20 +567,29 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return; } - setInternal(i, start, ends[i], markedAt, delTime); - if (cmp == 0) + // Otherwise, the new element overwite until the min(end, next start) + if (comparator.compare(end, starts[i+1]) < 0) + { + setInternal(i, start, end, markedAt, delTime); + // We have fully handled the new element so we're done return; + } - start = ends[i]; + setInternal(i, start, starts[i+1].invert(), markedAt, delTime); + start = starts[i+1]; i++; } else { - // We don't ovewrite fully. Insert the new interval, and then update the now next + // We don't overwrite fully. Insert the new interval, and then update the now next // one to reflect the not overwritten parts. We're then done. addInternal(i, start, end, markedAt, delTime); i++; - setInternal(i, end, ends[i], markedAts[i], delTimes[i]); + Slice.Bound newStart = end.invert(); + if (!Slice.isEmpty(comparator, newStart, ends[i])) + { + setInternal(i, newStart, ends[i], markedAts[i], delTimes[i]); + } return; } } @@ -644,13 +603,17 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable // If we stop before the start of the current element, just insert the new // interval and we're done; otherwise insert until the beginning of the // current element - if (comparator.compare(end, starts[i]) <= 0) + if (comparator.compare(end, starts[i]) < 0) { addInternal(i, start, end, markedAt, delTime); return; } - addInternal(i, start, starts[i], markedAt, delTime); - i++; + Slice.Bound newEnd = starts[i].invert(); + if (!Slice.isEmpty(comparator, start, newEnd)) + { + addInternal(i, start, newEnd, markedAt, delTime); + i++; + } } // After that, we're overwritten on the current element but might have @@ -660,7 +623,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable if (comparator.compare(end, ends[i]) <= 0) return; - start = ends[i]; + start = ends[i].invert(); i++; } } @@ -677,7 +640,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable /* * Adds the new tombstone at index i, growing and/or moving elements to make room for it. */ - private void addInternal(int i, Composite start, Composite end, long markedAt, int delTime) + private void addInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime) { assert i >= 0; @@ -730,12 +693,12 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable delTimes = grow(delTimes, size, newLength, i); } - private static Composite[] grow(Composite[] a, int size, int newLength, int i) + private static Slice.Bound[] grow(Slice.Bound[] a, int size, int newLength, int i) { if (i < 0 || i >= size) return Arrays.copyOf(a, newLength); - Composite[] newA = new Composite[newLength]; + Slice.Bound[] newA = new Slice.Bound[newLength]; System.arraycopy(a, 0, newA, 0, i); System.arraycopy(a, i, newA, i+1, size - i); return newA; @@ -780,7 +743,7 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable starts[i] = null; } - private void setInternal(int i, Composite start, Composite end, long markedAt, int delTime) + private void setInternal(int i, Slice.Bound start, Slice.Bound end, long markedAt, int delTime) { if (starts[i] != null) boundaryHeapSize -= starts[i].unsharedHeapSize() + ends[i].unsharedHeapSize(); @@ -802,82 +765,91 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable + ObjectSizes.sizeOfArray(delTimes); } + // TODO: This should be moved someplace else as it shouldn't be used directly: some ranges might become + // complex deletion times. We'll also only need this for backward compatibility, this isn't used otherwise. public static class Serializer implements IVersionedSerializer<RangeTombstoneList> { - private final CType type; + private final LegacyLayout layout; - public Serializer(CType type) + public Serializer(LegacyLayout layout) { - this.type = type; + this.layout = layout; } public void serialize(RangeTombstoneList tombstones, DataOutputPlus out, int version) throws IOException { - if (tombstones == null) - { - out.writeInt(0); - return; - } - - out.writeInt(tombstones.size); - for (int i = 0; i < tombstones.size; i++) - { - type.serializer().serialize(tombstones.starts[i], out); - type.serializer().serialize(tombstones.ends[i], out); - out.writeInt(tombstones.delTimes[i]); - out.writeLong(tombstones.markedAts[i]); - } + // TODO + throw new UnsupportedOperationException(); + //if (tombstones == null) + //{ + // out.writeInt(0); + // return; + //} + + //out.writeInt(tombstones.size); + //for (int i = 0; i < tombstones.size; i++) + //{ + // layout.serializer().serialize(tombstones.starts[i], out); + // layout.serializer().serialize(tombstones.ends[i], out); + // out.writeInt(tombstones.delTimes[i]); + // out.writeLong(tombstones.markedAts[i]); + //} } public RangeTombstoneList deserialize(DataInput in, int version) throws IOException { - int size = in.readInt(); - if (size == 0) - return null; - - RangeTombstoneList tombstones = new RangeTombstoneList(type, size); - - for (int i = 0; i < size; i++) - { - Composite start = type.serializer().deserialize(in); - Composite end = type.serializer().deserialize(in); - int delTime = in.readInt(); - long markedAt = in.readLong(); - - if (version >= MessagingService.VERSION_20) - { - tombstones.setInternal(i, start, end, markedAt, delTime); - } - else - { - /* - * The old implementation used to have range sorted by left value, but with potentially - * overlapping range. So we need to use the "slow" path. - */ - tombstones.add(start, end, markedAt, delTime); - } - } - - // The "slow" path take care of updating the size, but not the fast one - if (version >= MessagingService.VERSION_20) - tombstones.size = size; - return tombstones; + // TODO + throw new UnsupportedOperationException(); + + //int size = in.readInt(); + //if (size == 0) + // return null; + + //RangeTombstoneList tombstones = new RangeTombstoneList(layout, size); + + //for (int i = 0; i < size; i++) + //{ + // Slice.Bound start = layout.serializer().deserialize(in); + // Slice.Bound end = layout.serializer().deserialize(in); + // int delTime = in.readInt(); + // long markedAt = in.readLong(); + + // if (version >= MessagingService.VERSION_20) + // { + // tombstones.setInternal(i, start, end, markedAt, delTime); + // } + // else + // { + // /* + // * The old implementation used to have range sorted by left value, but with potentially + // * overlapping range. So we need to use the "slow" path. + // */ + // tombstones.add(start, end, markedAt, delTime); + // } + //} + + //// The "slow" path take care of updating the size, but not the fast one + //if (version >= MessagingService.VERSION_20) + // tombstones.size = size; + //return tombstones; } public long serializedSize(RangeTombstoneList tombstones, TypeSizes typeSizes, int version) { - if (tombstones == null) - return typeSizes.sizeof(0); - - long size = typeSizes.sizeof(tombstones.size); - for (int i = 0; i < tombstones.size; i++) - { - size += type.serializer().serializedSize(tombstones.starts[i], typeSizes); - size += type.serializer().serializedSize(tombstones.ends[i], typeSizes); - size += typeSizes.sizeof(tombstones.delTimes[i]); - size += typeSizes.sizeof(tombstones.markedAts[i]); - } - return size; + // TODO + throw new UnsupportedOperationException(); + //if (tombstones == null) + // return typeSizes.sizeof(0); + + //long size = typeSizes.sizeof(tombstones.size); + //for (int i = 0; i < tombstones.size; i++) + //{ + // size += type.serializer().serializedSize(tombstones.starts[i], typeSizes); + // size += type.serializer().serializedSize(tombstones.ends[i], typeSizes); + // size += typeSizes.sizeof(tombstones.delTimes[i]); + // size += typeSizes.sizeof(tombstones.markedAts[i]); + //} + //return size; } public long serializedSize(RangeTombstoneList tombstones, int version) @@ -885,56 +857,4 @@ public class RangeTombstoneList implements Iterable<RangeTombstone>, IMeasurable return serializedSize(tombstones, TypeSizes.NATIVE, version); } } - - /** - * This object allow testing whether a given column (name/timestamp) is deleted - * or not by this RangeTombstoneList, assuming that the column given to this - * object are passed in (comparator) sorted order. - * - * This is more efficient that calling RangeTombstoneList.isDeleted() repeatedly - * in that case since we're able to take the sorted nature of the RangeTombstoneList - * into account. - */ - public class InOrderTester - { - private int idx; - - public boolean isDeleted(Cell cell) - { - CellName name = cell.name(); - long timestamp = cell.timestamp(); - - while (idx < size) - { - int cmp = comparator.compare(name, starts[idx]); - - if (cmp < 0) - { - return false; - } - else if (cmp == 0) - { - // No matter what the counter cell's timestamp is, a tombstone always takes precedence. See CASSANDRA-7346. - if (cell instanceof CounterCell) - return true; - - // As for searchInternal, we need to check the previous end - if (idx > 0 && comparator.compare(name, ends[idx-1]) == 0 && markedAts[idx-1] > markedAts[idx]) - return markedAts[idx-1] >= timestamp; - else - return markedAts[idx] >= timestamp; - } - else - { - if (comparator.compare(name, ends[idx]) <= 0) - return markedAts[idx] >= timestamp || cell instanceof CounterCell; - else - idx++; - } - } - - return false; - } - } - }
