Repository: cassandra Updated Branches: refs/heads/trunk 36e4f4f3e -> 86c7785af
Make sure RangeTombstone.Tracker only keeps the ranges it needs to patch by slebresne; reviewed by benedict for CASSANDRA-9486 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b0dbea3d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b0dbea3d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b0dbea3d Branch: refs/heads/trunk Commit: b0dbea3dd0bebac97ddc14a847cce54cc38a7177 Parents: 8fcb620 Author: Sylvain Lebresne <[email protected]> Authored: Mon Jun 1 14:52:07 2015 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Jun 3 14:14:42 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/DeletionTime.java | 5 + .../org/apache/cassandra/db/RangeTombstone.java | 194 +++++++++++++------ .../db/compaction/LazilyCompactedRow.java | 3 + 4 files changed, 148 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c555a91..16ce060 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.16: + * Don't accumulate more range than necessary in RangeTombstone.Tracker (CASSANDRA-9486) * Add broadcast and rpc addresses to system.local (CASSANDRA-9436) * Always mark sstable suspect when corrupted (CASSANDRA-9478) * Add database users and permissions to CQL3 documentation (CASSANDRA-7558) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index dd2ccaf..b39d681 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -114,6 +114,11 @@ public class DeletionTime implements Comparable<DeletionTime> return column.timestamp() <= markedForDeleteAt; } + public boolean supersedes(DeletionTime dt) + { + return this.markedForDeleteAt > dt.markedForDeleteAt; + } + public long memorySize() { long fields = TypeSizes.NATIVE.sizeof(markedForDeleteAt) + TypeSizes.NATIVE.sizeof(localDeletionTime); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/RangeTombstone.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RangeTombstone.java b/src/java/org/apache/cassandra/db/RangeTombstone.java index 16fc27a..fe9da20 100644 --- a/src/java/org/apache/cassandra/db/RangeTombstone.java +++ b/src/java/org/apache/cassandra/db/RangeTombstone.java @@ -114,52 +114,73 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement return comparator.compare(min, rt.min) <= 0 && comparator.compare(max, rt.max) >= 0; } + /** + * Tracks opened RangeTombstones when iterating over a partition. + * <p> + * This tracker must be provided all the atoms of a given partition in + * order (to the {@code update} method). Given this, it keeps enough + * information to be able to decide if one of an atom is deleted (shadowed) + * by a previously open RT. One the tracker can prove a given range + * tombstone cannot be useful anymore (that is, as soon as we've seen an + * atom that is after the end of that RT), it discards this RT. In other + * words, the maximum memory used by this object should be proportional to + * the maximum number of RT that can be simultaneously open (and this + * should fairly low in practice). + */ public static class Tracker { private final Comparator<ByteBuffer> comparator; - private final Deque<RangeTombstone> ranges = new ArrayDeque<RangeTombstone>(); - private final SortedSet<RangeTombstone> maxOrderingSet = new TreeSet<RangeTombstone>(new Comparator<RangeTombstone>() - { - public int compare(RangeTombstone t1, RangeTombstone t2) - { - return comparator.compare(t1.max, t2.max); - } - }); - public final Set<RangeTombstone> expired = new HashSet<RangeTombstone>(); + + // A list the currently open RTs. We keep the list sorted in order of growing end bounds as for a + // new atom, this allows to efficiently find the RTs that are now useless (if any). Also note that because + // atom are passed to the tracker in order, any RT that is tracked can be assumed as opened, i.e. we + // never have to test the RTs start since it's always assumed to be less than what we have. + // Also note that this will store expired RTs (#7810). Those will be of type ExpiredRangeTombstone and + // will be ignored by writeOpenedMarker. + private final List<RangeTombstone> openedTombstones = new LinkedList<RangeTombstone>(); + + // Total number of atoms written by writeOpenedMarker(). private int atomCount; + /** + * Creates a new tracker given the table comparator. + * + * @param comparator the comparator for the table this will track atoms + * for. The tracker assumes that atoms will be later provided to the + * tracker in {@code comparator} order. + */ public Tracker(Comparator<ByteBuffer> comparator) { this.comparator = comparator; } /** - * Compute RangeTombstone that are needed at the beginning of an index + * Computes the RangeTombstone that are needed at the beginning of an index * block starting with {@code firstColumn}. - * Returns the total serialized size of said tombstones and write them - * to {@code out} it if isn't null. + * + * @return the total serialized size of said tombstones and write them to + * {@code out} it if isn't null. */ public long writeOpenedMarker(OnDiskAtom firstColumn, DataOutput out, OnDiskAtom.Serializer atomSerializer) throws IOException { long size = 0; - if (ranges.isEmpty()) + if (openedTombstones.isEmpty()) return size; /* - * Compute the marker that needs to be written at the beginning of - * this block. We need to write one if it the more recent + * Compute the markers that needs to be written at the beginning of + * this block. We need to write one if it is the more recent * (opened) tombstone for at least some part of its range. */ List<RangeTombstone> toWrite = new LinkedList<RangeTombstone>(); outer: - for (RangeTombstone tombstone : ranges) + for (RangeTombstone tombstone : openedTombstones) { - // If ever the first column is outside the range, skip it (in - // case update() hasn't been called yet) + // If the first column is outside the range, skip it (in case update() hasn't been called yet) if (comparator.compare(firstColumn.name(), tombstone.max) > 0) continue; - if (expired.contains(tombstone)) + if (tombstone instanceof ExpiredRangeTombstone) continue; RangeTombstone updated = new RangeTombstone(firstColumn.name(), tombstone.max, tombstone.data); @@ -186,6 +207,9 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement return size; } + /** + * The total number of atoms written by calls to the method {@link #writeOpenedMarker}. + */ public int writtenAtom() { return atomCount; @@ -193,69 +217,129 @@ public class RangeTombstone extends Interval<ByteBuffer, DeletionTime> implement /** * Update this tracker given an {@code atom}. - * If column is a Column, check if any tracked range is useless and - * can be removed. If it is a RangeTombstone, add it to this tracker. + * <p> + * This method first test if some range tombstone can be discarded due + * to the knowledge of that new atom. Then, if it's a range tombstone, + * it adds it to the tracker. + * <p> + * Note that this method should be called on *every* atom of a partition for + * the tracker to work as efficiently as possible (#9486). */ public void update(OnDiskAtom atom, boolean isExpired) { - if (atom instanceof RangeTombstone) + // Get rid of now useless RTs + ListIterator<RangeTombstone> iterator = openedTombstones.listIterator(); + while (iterator.hasNext()) { - RangeTombstone t = (RangeTombstone)atom; - // This could be a repeated marker already. If so, we already have a range in which it is - // fully included. While keeping both would be ok functionaly, we could end up with a lot of - // useless marker after a few compaction, so avoid this. - for (RangeTombstone tombstone : maxOrderingSet.tailSet(t)) + // If this tombstone stops before the new atom, it is now useless since it cannot cover this or any future + // atoms. Otherwise, if a RT ends after the new atom, then we know that's true of any following atom too + // since maxOrderingSet is sorted by end bounds + RangeTombstone t = iterator.next(); + if (comparator.compare(atom.name(), t.max) > 0) { - // We only care about tombstone have the same max than t - if (comparator.compare(t.max, tombstone.max) > 0) - break; - - // Since it is assume tombstones are passed to this method in growing min order, it's enough to - // check for the data to know is the current tombstone is included in a previous one - if (tombstone.data.equals(t.data)) - return; + iterator.remove(); + } + else + { + // If the atom is a RT, we'll add it next and for that we want to start by looking at the atom we just + // returned, so rewind the iterator. + iterator.previous(); + break; } - ranges.addLast(t); - maxOrderingSet.add(t); - if (isExpired) - expired.add(t); } - else + + // If it's a RT, adds it. + if (atom instanceof RangeTombstone) { - assert atom instanceof Column; - Iterator<RangeTombstone> iter = maxOrderingSet.iterator(); - while (iter.hasNext()) + RangeTombstone toAdd = (RangeTombstone)atom; + if (isExpired) + toAdd = new ExpiredRangeTombstone(toAdd); + + // We want to maintain openedTombstones in end bounds order so we find where to insert the new element + // and add it. While doing so, we also check if that new tombstone fully shadow or is fully shadowed + // by an existing tombstone so we avoid tracking more tombstone than necessary (and we know this will + // at least happend for start-of-index-block repeated range tombstones). + while (iterator.hasNext()) { - RangeTombstone tombstone = iter.next(); - if (comparator.compare(atom.name(), tombstone.max) > 0) + RangeTombstone existing = iterator.next(); + int cmp = comparator.compare(toAdd.max, existing.max); + if (cmp > 0) { - // That tombstone is now useless - iter.remove(); - ranges.remove(tombstone); + // the new one covers more than the existing one. If the new one happens to also supersedes + // the existing one, remove the existing one. In any case, we're not done yet. + if (toAdd.data.supersedes(existing.data)) + iterator.remove(); } else { - // Since we're iterating by growing end bound, if the current range - // includes the column, so does all the next ones + // the new one is included in the existing one. If the new one supersedes the existing one, + // then we add the new one (and if the new one ends like the existing one, we can actually remove + // the existing one), otherwise we can actually ignore it. In any case, we're done. + if (toAdd.data.supersedes(existing.data)) + { + if (cmp == 0) + iterator.set(toAdd); + else + insertBefore(toAdd, iterator); + } return; } } + // If we reach here, either we had no tombstones and the new one ends after all existing ones. + iterator.add(toAdd); } } + /** + * Adds the provided {@code tombstone} _before_ the last element returned by {@code iterator.next()}. + * <p> + * This method assumes that {@code iterator.next()} has been called prior to this method call, i.e. that + * {@code iterator.hasPrevious() == true}. + */ + private static void insertBefore(RangeTombstone tombstone, ListIterator<RangeTombstone> iterator) + { + assert iterator.hasPrevious(); + iterator.previous(); + iterator.add(tombstone); + iterator.next(); + } + + /** + * Tests if the provided column is deleted by one of the tombstone + * tracked by this tracker. + * <p> + * This method should be called on columns in the same order than for the update() + * method. Note that this method does not update the tracker so the update() method + * should still be called on {@code column} (it doesn't matter if update is called + * before or after this call). + */ public boolean isDeleted(Column column) { - for (RangeTombstone tombstone : ranges) + // We know every tombstone kept are "open", start before the column. So the + // column is deleted if any of the tracked tombstone ends after the column + // (this will be the case of every RT if update() has been called before this + // method, but we might have a few RT to skip otherwise) and the RT deletion is + // actually more recent than the column timestamp. + for (RangeTombstone tombstone : openedTombstones) { - if (comparator.compare(column.name(), tombstone.min) >= 0 - && comparator.compare(column.name(), tombstone.max) <= 0 + if (comparator.compare(column.name(), tombstone.max) <= 0 && tombstone.maxTimestamp() >= column.timestamp()) - { return true; - } } return false; } + + /** + * The tracker needs to track expired range tombstone but keep tracks that they are + * expired, so this is what this class is used for. + */ + private static class ExpiredRangeTombstone extends RangeTombstone + { + private ExpiredRangeTombstone(RangeTombstone tombstone) + { + super(tombstone.min, tombstone.max, tombstone.data); + } + } } public static class Serializer implements ISSTableSerializer<RangeTombstone> http://git-wip-us.apache.org/repos/asf/cassandra/blob/b0dbea3d/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index f61cc2b..43801c6 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -304,6 +304,9 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable // not the range tombstones. For that we use the columnIndexer tombstone tracker. if (indexBuilder.tombstoneTracker().isDeleted(reduced)) { + // We skip that column so it won't be passed to the tracker by the index builded. So pass it now to + // make sure we still discard potentially un-needed RT as soon as possible. + indexBuilder.tombstoneTracker().update(reduced, false); indexer.remove(reduced); return null; }
