http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 670b1ae..e805fd2 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -17,12 +17,12 @@ */ package org.apache.cassandra.db.partitions; -import java.io.DataInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,99 +31,185 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Sorting; +import org.apache.cassandra.utils.MergeIterator; /** * Stores updates made on a partition. * <p> - * A PartitionUpdate object requires that all writes are performed before we - * try to read the updates (attempts to write to the PartitionUpdate after a - * read method has been called will result in an exception being thrown). - * In other words, a Partition is mutable while we do a write and become - * immutable as soon as it is read. + * A PartitionUpdate object requires that all writes/additions are performed before we + * try to read the updates (attempts to write to the PartitionUpdate after a read method + * has been called will result in an exception being thrown). In other words, a Partition + * is mutable while it's written but becomes immutable as soon as it is read. * <p> - * Row updates are added to this update through the {@link #writer} method which - * returns a {@link Row.Writer}. Multiple rows can be added to this writer as required and - * those row do not have to added in (clustering) order, and the same row can be added - * multiple times. Further, for a given row, the writer actually supports intermingling - * the writing of cells for different complex cells (note that this is usually not supported - * by {@code Row.Writer} implementations, but is supported here because - * {@code ModificationStatement} requires that (because we could have multiple {@link Operation} - * on the same column in a given statement)). + * A typical usage is to create a new update ({@code new PartitionUpdate(metadata, key, columns, capacity)}) + * and then add rows and range tombstones through the {@code add()} methods (the partition + * level deletion time can also be set with {@code addPartitionDeletion()}). However, there + * is also a few static helper constructor methods for special cases ({@code emptyUpdate()}, + * {@code fullPartitionDelete} and {@code singleRowUpdate}). */ -public class PartitionUpdate extends AbstractPartitionData implements Sorting.Sortable +public class PartitionUpdate extends AbstractThreadUnsafePartition { protected static final Logger logger = LoggerFactory.getLogger(PartitionUpdate.class); - // Records whether the partition update has been sorted (it is the rows contained in the partition - // that are sorted since we don't require rows to be added in order). Sorting happens when the - // update is read, and writting is rejected as soon as the update is sorted (it's actually possible - // to manually allow new update by using allowNewUpdates(), and we could make that more implicit, but - // as only triggers really requires it, we keep it simple for now). - private boolean isSorted; - public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer(); - private final Writer writer; + private final int createdAtInSec = FBUtilities.nowInSeconds(); + + // Records whether this update is "built", i.e. if the build() method has been called, which + // happens when the update is read. Further writing is then rejected though a manual call + // to allowNewUpdates() allow new writes. We could make that more implicit but only triggers + // really requires that so we keep it simple for now). + private boolean isBuilt; + private boolean canReOpen = true; + + private final MutableDeletionInfo deletionInfo; + private RowStats stats; // will be null if isn't built + + private Row staticRow = Rows.EMPTY_STATIC_ROW; - // Used by compare for the sake of implementing the Sorting.Sortable interface (which is in turn used - // to sort the rows of this update). - private final InternalReusableClustering p1 = new InternalReusableClustering(); - private final InternalReusableClustering p2 = new InternalReusableClustering(); + private final boolean canHaveShadowedData; private PartitionUpdate(CFMetaData metadata, DecoratedKey key, - DeletionInfo delInfo, - RowDataBlock data, PartitionColumns columns, - int initialRowCapacity) + Row staticRow, + List<Row> rows, + MutableDeletionInfo deletionInfo, + RowStats stats, + boolean isBuilt, + boolean canHaveShadowedData) { - super(metadata, key, delInfo, columns, data, initialRowCapacity); - this.writer = createWriter(); + super(metadata, key, columns, rows); + this.staticRow = staticRow; + this.deletionInfo = deletionInfo; + this.stats = stats; + this.isBuilt = isBuilt; + this.canHaveShadowedData = canHaveShadowedData; } public PartitionUpdate(CFMetaData metadata, DecoratedKey key, - DeletionInfo delInfo, PartitionColumns columns, int initialRowCapacity) { - this(metadata, - key, - delInfo, - new RowDataBlock(columns.regulars, initialRowCapacity, true, metadata.isCounter()), - columns, - initialRowCapacity); + this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true); } - public PartitionUpdate(CFMetaData metadata, - DecoratedKey key, - PartitionColumns columns, - int initialRowCapacity) + /** + * Creates a empty immutable partition update. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the created update. + * + * @return the newly created empty (and immutable) update. + */ + public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key) + { + return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false); + } + + /** + * Creates an immutable partition update that entirely deletes a given partition. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the partition that the created update should delete. + * @param timestamp the timestamp for the deletion. + * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion. + * + * @return the newly created partition deletion update. + */ + public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec) + { + return new PartitionUpdate(metadata, key, PartitionColumns.NONE, Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new MutableDeletionInfo(timestamp, nowInSec), RowStats.NO_STATS, true, false); + } + + /** + * Creates an immutable partition update that contains a single row update. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the partition that the created update should delete. + * @param row the row for the update. + * + * @return the newly created partition update containing only {@code row}. + */ + public static PartitionUpdate singleRowUpdate(CFMetaData metadata, DecoratedKey key, Row row) + { + return row.isStatic() + ? new PartitionUpdate(metadata, key, new PartitionColumns(row.columns(), Columns.NONE), row, Collections.<Row>emptyList(), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false) + : new PartitionUpdate(metadata, key, new PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, Collections.singletonList(row), MutableDeletionInfo.live(), RowStats.NO_STATS, true, false); + } + + /** + * Turns the given iterator into an update. + * + * Warning: this method does not close the provided iterator, it is up to + * the caller to close it. + */ + public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator) + { + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); + + List<Row> rows = new ArrayList<>(); + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + rows.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + + if (reversed) + Collections.reverse(rows); + + return new PartitionUpdate(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), iterator.stats(), true, false); + } + + public static PartitionUpdate fromIterator(RowIterator iterator) + { + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); + + List<Row> rows = new ArrayList<>(); + + RowStats.Collector collector = new RowStats.Collector(); + + while (iterator.hasNext()) + { + Row row = iterator.next(); + rows.add(row); + Rows.collectStats(row, collector); + } + + if (reversed) + Collections.reverse(rows); + + return new PartitionUpdate(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, MutableDeletionInfo.live(), collector.get(), true, false); + } + + protected boolean canHaveShadowedData() { - this(metadata, - key, - DeletionInfo.live(), - columns, - initialRowCapacity); + return canHaveShadowedData; } - protected Writer createWriter() + public Row staticRow() { - return new RegularWriter(); + return staticRow; } - protected StaticWriter createStaticWriter() + public DeletionInfo deletionInfo() { - return new StaticWriter(); + return deletionInfo; } /** @@ -166,7 +252,7 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So { try (DataOutputBuffer out = new DataOutputBuffer()) { - serializer.serialize(update, out, MessagingService.current_version); + serializer.serialize(update, out, version); return ByteBuffer.wrap(out.getData(), 0, out.getLength()); } catch (IOException e) @@ -176,60 +262,6 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So } /** - * Creates a empty immutable partition update. - * - * @param metadata the metadata for the created update. - * @param key the partition key for the created update. - * - * @return the newly created empty (and immutable) update. - */ - public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key) - { - return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 0) - { - public Row.Writer staticWriter() - { - throw new UnsupportedOperationException(); - } - - public Row.Writer writer() - { - throw new UnsupportedOperationException(); - } - - public void addPartitionDeletion(DeletionTime deletionTime) - { - throw new UnsupportedOperationException(); - } - - public void addRangeTombstone(RangeTombstone range) - { - throw new UnsupportedOperationException(); - } - }; - } - - /** - * Creates a partition update that entirely deletes a given partition. - * - * @param metadata the metadata for the created update. - * @param key the partition key for the partition that the created update should delete. - * @param timestamp the timestamp for the deletion. - * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion. - * - * @return the newly created partition deletion update. - */ - public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec) - { - return new PartitionUpdate(metadata, - key, - new DeletionInfo(timestamp, nowInSec), - new RowDataBlock(Columns.NONE, 0, true, metadata.isCounter()), - PartitionColumns.NONE, - 0); - } - - /** * Merges the provided updates, yielding a new update that incorporates all those updates. * * @param updates the collection of updates to merge. This shouldn't be empty. @@ -239,17 +271,30 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So public static PartitionUpdate merge(Collection<PartitionUpdate> updates) { assert !updates.isEmpty(); - if (updates.size() == 1) + final int size = updates.size(); + + if (size == 1) return Iterables.getOnlyElement(updates); - int totalSize = 0; + // Used when merging row to decide of liveness + int nowInSec = FBUtilities.nowInSeconds(); + PartitionColumns.Builder builder = PartitionColumns.builder(); DecoratedKey key = null; CFMetaData metadata = null; + MutableDeletionInfo deletion = MutableDeletionInfo.live(); + Row staticRow = Rows.EMPTY_STATIC_ROW; + List<Iterator<Row>> updateRowIterators = new ArrayList<>(size); + RowStats stats = RowStats.NO_STATS; + for (PartitionUpdate update : updates) { - totalSize += update.rows; builder.addAll(update.columns()); + deletion.add(update.deletionInfo()); + if (!update.staticRow().isEmpty()) + staticRow = staticRow == Rows.EMPTY_STATIC_ROW ? update.staticRow() : Rows.merge(staticRow, update.staticRow(), nowInSec); + updateRowIterators.add(update.iterator()); + stats = stats.mergeWith(update.stats()); if (key == null) key = update.partitionKey(); @@ -262,23 +307,70 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So assert metadata.cfId.equals(update.metadata().cfId); } - // Used when merging row to decide of liveness - int nowInSec = FBUtilities.nowInSeconds(); - PartitionUpdate newUpdate = new PartitionUpdate(metadata, key, builder.build(), totalSize); - for (PartitionUpdate update : updates) + PartitionColumns columns = builder.build(); + + final Row.Merger merger = new Row.Merger(size, nowInSec, columns.regulars); + + Iterator<Row> merged = MergeIterator.get(updateRowIterators, metadata.comparator, new MergeIterator.Reducer<Row, Row>() { - newUpdate.deletionInfo.add(update.deletionInfo); - if (!update.staticRow().isEmpty()) + @Override + public boolean trivialReduceIsTrivial() { - if (newUpdate.staticRow().isEmpty()) - newUpdate.staticRow = update.staticRow().takeAlias(); - else - Rows.merge(newUpdate.staticRow(), update.staticRow(), newUpdate.columns().statics, newUpdate.staticWriter(), nowInSec, SecondaryIndexManager.nullUpdater); + return true; } - for (Row row : update) - row.copyTo(newUpdate.writer); - } - return newUpdate; + + public void reduce(int idx, Row current) + { + merger.add(idx, current); + } + + protected Row getReduced() + { + // Note that while merger.getRow() can theoretically return null, it won't in this case because + // we don't pass an "activeDeletion". + return merger.merge(DeletionTime.LIVE); + } + + @Override + protected void onKeyChange() + { + merger.clear(); + } + }); + + List<Row> rows = new ArrayList<>(); + Iterators.addAll(rows, merged); + + return new PartitionUpdate(metadata, key, columns, staticRow, rows, deletion, stats, true, true); + } + + /** + * Modify this update to set every timestamp for live data to {@code newTimestamp} and + * every deletion timestamp to {@code newTimestamp - 1}. + * + * There is no reason to use that expect on the Paxos code path, where we need ensure that + * anything inserted use the ballot timestamp (to respect the order of update decided by + * the Paxos algorithm). We use {@code newTimestamp - 1} for deletions because tombstones + * always win on timestamp equality and we don't want to delete our own insertions + * (typically, when we overwrite a collection, we first set a complex deletion to delete the + * previous collection before adding new elements. If we were to set that complex deletion + * to the same timestamp that the new elements, it would delete those elements). And since + * tombstones always wins on timestamp equality, using -1 guarantees our deletion will still + * delete anything from a previous update. + */ + public void updateAllTimestamp(long newTimestamp) + { + // We know we won't be updating that update again after this call, and doing is post built is potentially + // slightly more efficient (things are more "compact"). So force a build if it hasn't happened yet. + maybeBuild(); + + deletionInfo.updateAllTimestamp(newTimestamp - 1); + + if (!staticRow.isEmpty()) + staticRow = staticRow.updateAllTimestamp(newTimestamp); + + for (int i = 0; i < rows.size(); i++) + rows.set(i, rows.get(i).updateAllTimestamp(newTimestamp)); } /** @@ -291,7 +383,7 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So */ public int operationCount() { - return rowCount() + return rows.size() + deletionInfo.rangeCount() + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1); } @@ -303,17 +395,29 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So */ public int dataSize() { - int clusteringSize = metadata().comparator.size(); int size = 0; for (Row row : this) { size += row.clustering().dataSize(); - for (Cell cell : row) - size += cell.dataSize(); + for (ColumnData cd : row) + size += cd.dataSize(); } return size; } + @Override + public int rowCount() + { + maybeBuild(); + return super.rowCount(); + } + + public RowStats stats() + { + maybeBuild(); + return stats; + } + /** * If a partition update has been read (and is thus unmodifiable), a call to this method * makes the update modifiable again. @@ -325,21 +429,17 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So */ public synchronized void allowNewUpdates() { + if (!canReOpen) + throw new IllegalStateException("You cannot do more updates on collectCounterMarks has been called"); + // This is synchronized to make extra sure things work properly even if this is // called concurrently with sort() (which should be avoided in the first place, but // better safe than sorry). - isSorted = false; - } - - @Override - public int rowCount() - { - maybeSort(); - return super.rowCount(); + isBuilt = false; } /** - * Returns an iterator that iterators over the rows of this update in clustering order. + * Returns an iterator that iterates over the rows of this update in clustering order. * <p> * Note that this might trigger a sorting of the update, and as such the update will not * be modifiable anymore after this call. @@ -349,14 +449,14 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So @Override public Iterator<Row> iterator() { - maybeSort(); + maybeBuild(); return super.iterator(); } @Override protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) { - maybeSort(); + maybeBuild(); return super.sliceableUnfilteredIterator(columns, reversed); } @@ -370,8 +470,8 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So for (Row row : this) { metadata().comparator.validate(row.clustering()); - for (Cell cell : row) - cell.validate(); + for (ColumnData cd : row) + cd.validate(); } } @@ -382,6 +482,27 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So */ public long maxTimestamp() { + maybeBuild(); + + long maxTimestamp = deletionInfo.maxTimestamp(); + for (Row row : this) + { + maxTimestamp = Math.max(maxTimestamp, row.primaryKeyLivenessInfo().timestamp()); + for (ColumnData cd : row) + { + if (cd.column().isSimple()) + { + maxTimestamp = Math.max(maxTimestamp, ((Cell)cd).timestamp()); + } + else + { + ComplexColumnData complexData = (ComplexColumnData)cd; + maxTimestamp = Math.max(maxTimestamp, complexData.complexDeletion().markedForDeleteAt()); + for (Cell cell : complexData) + maxTimestamp = Math.max(maxTimestamp, cell.timestamp()); + } + } + } return maxTimestamp; } @@ -394,62 +515,73 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So public List<CounterMark> collectCounterMarks() { assert metadata().isCounter(); + maybeBuild(); + // We will take aliases on the rows of this update, and update them in-place. So we should be sure the + // update is no immutable for all intent and purposes. + canReOpen = false; - InternalReusableClustering clustering = new InternalReusableClustering(); List<CounterMark> l = new ArrayList<>(); - int i = 0; - for (Row row : this) + for (Row row : rows) { - for (Cell cell : row) + for (Cell cell : row.cells()) + { if (cell.isCounterCell()) - l.add(new CounterMark(clustering, i, cell.column(), cell.path())); - i++; + l.add(new CounterMark(row, cell.column(), cell.path())); + } } return l; } - /** - * Returns a row writer for the static row of this partition update. - * - * @return a row writer for the static row of this partition update. A partition - * update contains only one static row so only one row should be written through - * this writer (but if multiple rows are added, the latest written one wins). - */ - public Row.Writer staticWriter() + private void assertNotBuilt() { - return createStaticWriter(); + if (isBuilt) + throw new IllegalStateException("An update should not be written again once it has been read"); } - /** - * Returns a row writer to add (non-static) rows to this partition update. - * - * @return a row writer to add (non-static) rows to this partition update. - * Multiple rows can be successively added this way and the rows added do not have - * to be in clustering order. Further, the same row can be added multiple time. - * - */ - public Row.Writer writer() + public void addPartitionDeletion(DeletionTime deletionTime) { - if (isSorted) - throw new IllegalStateException("An update should not written again once it has been read"); + assertNotBuilt(); + deletionInfo.add(deletionTime); + } - return writer; + public void add(RangeTombstone range) + { + assertNotBuilt(); + deletionInfo.add(range, metadata.comparator); } /** - * Returns a range tombstone marker writer to add range tombstones to this - * partition update. - * <p> - * Note that if more convenient, range tombstones can also be added using - * {@link addRangeTombstone}. + * Adds a row to this update. + * + * There is no particular assumption made on the order of row added to a partition update. It is further + * allowed to add the same row (more precisely, multiple row objects for the same clustering). * - * @param isReverseOrder whether the range tombstone marker will be provided to the returned writer - * in clustering order or in reverse clustering order. - * @return a range tombstone marker writer to add range tombstones to this update. + * Note however that the columns contained in the added row must be a subset of the columns used when + * creating this update. + * + * @param row the row to add. */ - public RangeTombstoneMarker.Writer markerWriter(boolean isReverseOrder) + public void add(Row row) { - return new RangeTombstoneCollector(isReverseOrder); + if (row.isEmpty()) + return; + + assertNotBuilt(); + + if (row.isStatic()) + { + // We test for == first because in most case it'll be true and that is faster + assert columns().statics == row.columns() || columns().statics.contains(row.columns()); + staticRow = staticRow.isEmpty() + ? row + : Rows.merge(staticRow, row, createdAtInSec); + } + else + { + // We test for == first because in most case it'll be true and that is faster + assert columns().regulars == row.columns() || columns().regulars.contains(row.columns()); + rows.add(row); + } } /** @@ -459,160 +591,70 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So */ public int size() { - return rows; + return rows.size(); } - private void maybeSort() + private void maybeBuild() { - if (isSorted) + if (isBuilt) return; - sort(); + build(); } - private synchronized void sort() + private synchronized void build() { - if (isSorted) + if (isBuilt) return; - if (rows <= 1) + if (rows.size() <= 1) { - isSorted = true; + finishBuild(); return; } - // Sort the rows - will still potentially contain duplicate (non-reconciled) rows - Sorting.sort(this); + Comparator<Row> comparator = metadata.comparator.rowComparator(); + // Sort the rows. Because the same row can have been added multiple times, we can still have duplicates after that + Collections.sort(rows, comparator); - // Now find duplicates and merge them together + // Now find the duplicates and merge them together int previous = 0; // The last element that was set - int nowInSec = FBUtilities.nowInSeconds(); - for (int current = 1; current < rows; current++) + for (int current = 1; current < rows.size(); current++) { // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already - int cmp = compare(previous, current); + Row previousRow = rows.get(previous); + Row currentRow = rows.get(current); + int cmp = comparator.compare(previousRow, currentRow); if (cmp == 0) { // current and previous are the same row. Merge current into previous // (and so previous + 1 will be "free"). - merge(current, previous, nowInSec); + rows.set(previous, Rows.merge(previousRow, currentRow, createdAtInSec)); } else { - // data[current] != [previous], so move current just after previous if needs be + // current != previous, so move current just after previous if needs be ++previous; if (previous != current) - move(current, previous); + rows.set(previous, currentRow); } } // previous is on the last value to keep - rows = previous + 1; - - isSorted = true; - } + for (int j = rows.size() - 1; j > previous; j--) + rows.remove(j); - /** - * This method is note meant to be used externally: it is only public so this - * update conform to the {@link Sorting.Sortable} interface. - */ - public int compare(int i, int j) - { - return metadata.comparator.compare(p1.setTo(i), p2.setTo(j)); - } - - protected class StaticWriter extends StaticRow.Builder - { - protected StaticWriter() - { - super(columns.statics, false, metadata().isCounter()); - } - - @Override - public void endOfRow() - { - super.endOfRow(); - if (staticRow == null) - { - staticRow = build(); - } - else - { - StaticRow.Builder builder = StaticRow.builder(columns.statics, true, metadata().isCounter()); - Rows.merge(staticRow, build(), columns.statics, builder, FBUtilities.nowInSeconds()); - staticRow = builder.build(); - } - } + finishBuild(); } - protected class RegularWriter extends Writer + private void finishBuild() { - // For complex column, the writer assumptions is that for a given row, cells of different - // complex columns are not intermingled (they also should be in cellPath order). We however - // don't yet guarantee that this will be the case for updates (both UpdateStatement and - // RowUpdateBuilder can potentially break that assumption; we could change those classes but - // that's non trivial, at least for UpdateStatement). - // To deal with that problem, we record which complex columns have been updated (for the current - // row) and if we detect a violation of our assumption, we switch the row we're writing - // into (which is ok because everything will be sorted and merged in maybeSort()). - private final Set<ColumnDefinition> updatedComplex = new HashSet(); - private ColumnDefinition lastUpdatedComplex; - private CellPath lastUpdatedComplexPath; - - public RegularWriter() - { - super(false); - } - - @Override - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - if (column.isComplex()) - { - if (updatedComplex.contains(column) - && (!column.equals(lastUpdatedComplex) || (column.cellPathComparator().compare(path, lastUpdatedComplexPath)) <= 0)) - { - // We've updated that complex already, but we've either updated another complex or it's not in order: as this - // break the writer assumption, switch rows. - endOfRow(); - - // Copy the clustering values from the previous row - int clusteringSize = metadata.clusteringColumns().size(); - int base = (row - 1) * clusteringSize; - for (int i = 0; i < clusteringSize; i++) - writer.writeClusteringValue(clusterings[base + i]); - - updatedComplex.clear(); - } - - lastUpdatedComplex = column; - lastUpdatedComplexPath = path; - updatedComplex.add(column); - } - super.writeCell(column, isCounter, value, info, path); - } - - @Override - public void endOfRow() - { - super.endOfRow(); - clear(); - } - - @Override - public Writer reset() - { - super.reset(); - clear(); - return this; - } - - private void clear() - { - updatedComplex.clear(); - lastUpdatedComplex = null; - lastUpdatedComplexPath = null; - } + RowStats.Collector collector = new RowStats.Collector(); + deletionInfo.collectStats(collector); + for (Row row : rows) + Rows.collectStats(row, collector); + stats = collector.get(); + isBuilt = true; } public static class PartitionUpdateSerializer @@ -648,7 +690,7 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) { assert !iter.isReverseOrder(); - UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows.size()); } } @@ -666,37 +708,46 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); int size = in.readInt(); Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); - SerializationHelper helper = new SerializationHelper(version, flag); + SerializationHelper helper = new SerializationHelper(metadata, version, flag); try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper)) { - return UnfilteredRowIterators.toUpdate(iterator); + return PartitionUpdate.fromIterator(iterator); } } assert key == null; // key is only there for the old format - UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag); - if (h.isEmpty) - return emptyUpdate(h.metadata, h.key); + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag); + if (header.isEmpty) + return emptyUpdate(header.metadata, header.key); - assert !h.isReversed; - assert h.rowEstimate >= 0; - PartitionUpdate upd = new PartitionUpdate(h.metadata, - h.key, - new DeletionInfo(h.partitionDeletion), - new RowDataBlock(h.sHeader.columns().regulars, h.rowEstimate, false, h.metadata.isCounter()), - h.sHeader.columns(), - h.rowEstimate); + assert !header.isReversed; + assert header.rowEstimate >= 0; - upd.staticRow = h.staticRow; + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false); + List<Row> rows = new ArrayList<>(header.rowEstimate); - RangeTombstoneMarker.Writer markerWriter = upd.markerWriter(false); - UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(version, flag), h.sHeader, upd.writer(), markerWriter); - - // Mark sorted after we're read it all since that's what we use in the writer() method to detect bad uses - upd.isSorted = true; + try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag, header)) + { + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + rows.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + } - return upd; + return new PartitionUpdate(header.metadata, + header.key, + header.sHeader.columns(), + header.staticRow, + rows, + deletionBuilder.build(), + header.sHeader.stats(), + true, + false); } public long serializedSize(PartitionUpdate update, int version) @@ -719,7 +770,7 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) { - return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows); + return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows.size()); } } } @@ -729,16 +780,14 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So * us to update the counter value based on the pre-existing value read during the read-before-write that counters * do. See {@link CounterMutation} to understand how this is used. */ - public class CounterMark + public static class CounterMark { - private final InternalReusableClustering clustering; - private final int row; + private final Row row; private final ColumnDefinition column; private final CellPath path; - private CounterMark(InternalReusableClustering clustering, int row, ColumnDefinition column, CellPath path) + private CounterMark(Row row, ColumnDefinition column, CellPath path) { - this.clustering = clustering; this.row = row; this.column = column; this.path = path; @@ -746,7 +795,7 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So public Clustering clustering() { - return clustering.setTo(row); + return row.clustering(); } public ColumnDefinition column() @@ -761,12 +810,17 @@ public class PartitionUpdate extends AbstractPartitionData implements Sorting.So public ByteBuffer value() { - return data.getValue(row, column, path); + return path == null + ? row.getCell(column).value() + : row.getCell(column, path).value(); } public void setValue(ByteBuffer value) { - data.setValue(row, column, path, value); + // This is a bit of a giant hack as this is the only place where we mutate a Row object. This makes it more efficient + // for counters however and this won't be needed post-#6506 so that's probably fine. + assert row instanceof ArrayBackedRow; + ((ArrayBackedRow)row).setValue(column, path, value); } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java new file mode 100644 index 0000000..492fe1d --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PurgingPartitionIterator.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; + +public abstract class PurgingPartitionIterator extends WrappingUnfilteredPartitionIterator +{ + private final DeletionPurger purger; + private final int gcBefore; + + private UnfilteredRowIterator next; + + public PurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore) + { + super(iterator); + this.gcBefore = gcBefore; + this.purger = new DeletionPurger() + { + public boolean shouldPurge(long timestamp, int localDeletionTime) + { + return timestamp < getMaxPurgeableTimestamp() && localDeletionTime < gcBefore; + } + }; + } + + protected abstract long getMaxPurgeableTimestamp(); + + // Called at the beginning of each new partition + protected void onNewPartition(DecoratedKey partitionKey) + { + } + + // Called for each partition that had only purged infos and are empty post-purge. + protected void onEmptyPartitionPostPurge(DecoratedKey partitionKey) + { + } + + // Called for every unfiltered. Meant for CompactionIterator to update progress + protected void updateProgress() + { + } + + @Override + public boolean hasNext() + { + while (next == null && super.hasNext()) + { + UnfilteredRowIterator iterator = super.next(); + onNewPartition(iterator.partitionKey()); + + UnfilteredRowIterator purged = purge(iterator); + if (isForThrift() || !purged.isEmpty()) + { + next = purged; + return true; + } + + onEmptyPartitionPostPurge(purged.partitionKey()); + } + return next != null; + } + + @Override + public UnfilteredRowIterator next() + { + UnfilteredRowIterator toReturn = next; + next = null; + return toReturn; + } + + private UnfilteredRowIterator purge(final UnfilteredRowIterator iter) + { + return new AlteringUnfilteredRowIterator(iter) + { + @Override + public DeletionTime partitionLevelDeletion() + { + DeletionTime dt = iter.partitionLevelDeletion(); + return purger.shouldPurge(dt) ? DeletionTime.LIVE : dt; + } + + @Override + public Row computeNextStatic(Row row) + { + return row.purge(purger, gcBefore); + } + + @Override + public Row computeNext(Row row) + { + return row.purge(purger, gcBefore); + } + + @Override + public RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + boolean reversed = isReverseOrder(); + if (marker.isBoundary()) + { + // We can only skip the whole marker if both deletion time are purgeable. + // If only one of them is, filterTombstoneMarker will deal with it. + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; + boolean shouldPurgeClose = purger.shouldPurge(boundary.closeDeletionTime(reversed)); + boolean shouldPurgeOpen = purger.shouldPurge(boundary.openDeletionTime(reversed)); + + if (shouldPurgeClose) + { + if (shouldPurgeOpen) + return null; + + return boundary.createCorrespondingOpenMarker(reversed); + } + + return shouldPurgeOpen + ? boundary.createCorrespondingCloseMarker(reversed) + : marker; + } + else + { + return purger.shouldPurge(((RangeTombstoneBoundMarker)marker).deletionTime()) ? null : marker; + } + } + + @Override + public Unfiltered next() + { + Unfiltered next = super.next(); + updateProgress(); + return next; + } + }; + } +}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java deleted file mode 100644 index 10022eb..0000000 --- a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; - -public abstract class TombstonePurgingPartitionIterator extends FilteringPartitionIterator -{ - private final int gcBefore; - - public TombstonePurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore) - { - super(iterator); - this.gcBefore = gcBefore; - } - - protected abstract long getMaxPurgeableTimestamp(); - - protected FilteringRow makeRowFilter() - { - return new FilteringRow() - { - @Override - protected boolean include(LivenessInfo info) - { - return !info.hasLocalDeletionTime() || !info.isPurgeable(getMaxPurgeableTimestamp(), gcBefore); - } - - @Override - protected boolean include(DeletionTime dt) - { - return includeDelTime(dt); - } - - @Override - protected boolean include(ColumnDefinition c, DeletionTime dt) - { - return includeDelTime(dt); - } - }; - } - - private boolean includeDelTime(DeletionTime dt) - { - return dt.isLive() || !dt.isPurgeable(getMaxPurgeableTimestamp(), gcBefore); - } - - @Override - protected boolean includePartitionDeletion(DeletionTime dt) - { - return includeDelTime(dt); - } - - @Override - protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) - { - if (marker.isBoundary()) - { - // We can only skip the whole marker if both deletion time are purgeable. - // If only one of them is, filterTombstoneMarker will deal with it. - RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; - return includeDelTime(boundary.endDeletionTime()) || includeDelTime(boundary.startDeletionTime()); - } - else - { - return includeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime()); - } - } - - @Override - protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) - { - if (!marker.isBoundary()) - return marker; - - // Note that we know this is called after includeRangeTombstoneMarker. So if one of the deletion time is - // purgeable, we know the other one isn't. - RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; - if (!(includeDelTime(boundary.closeDeletionTime(reversed)))) - return boundary.createCorrespondingCloseBound(reversed); - else if (!(includeDelTime(boundary.openDeletionTime(reversed)))) - return boundary.createCorrespondingOpenBound(reversed); - return boundary; - } - -}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 0d3d364..4414f44 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -19,13 +19,10 @@ package org.apache.cassandra.db.partitions; import java.io.IOError; import java.io.IOException; -import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.*; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.util.DataInputPlus; @@ -33,25 +30,14 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.MergeIterator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Static methods to work with partition iterators. */ public abstract class UnfilteredPartitionIterators { - private static final Logger logger = LoggerFactory.getLogger(UnfilteredPartitionIterators.class); - private static final Serializer serializer = new Serializer(); - private static final Comparator<UnfilteredRowIterator> partitionComparator = new Comparator<UnfilteredRowIterator>() - { - public int compare(UnfilteredRowIterator p1, UnfilteredRowIterator p2) - { - return p1.partitionKey().compareTo(p2.partitionKey()); - } - }; + private static final Comparator<UnfilteredRowIterator> partitionComparator = (p1, p2) -> p1.partitionKey().compareTo(p2.partitionKey()); public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator() { @@ -242,28 +228,6 @@ public abstract class UnfilteredPartitionIterators }; } - /** - * Convert all expired cells to equivalent tombstones. - * <p> - * See {@link UnfilteredRowIterators#convertExpiredCellsToTombstones} for details. - * - * @param iterator the iterator in which to conver expired cells. - * @param nowInSec the current time to use to decide if a cell is expired. - * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted - * to equivalent tombstones. - */ - public static UnfilteredPartitionIterator convertExpiredCellsToTombstones(UnfilteredPartitionIterator iterator, final int nowInSec) - { - return new WrappingUnfilteredPartitionIterator(iterator) - { - @Override - protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) - { - return UnfilteredRowIterators.convertExpiredCellsToTombstones(iter, nowInSec); - } - }; - } - public static UnfilteredPartitionIterator mergeLazily(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec) { assert !iterators.isEmpty(); @@ -330,52 +294,6 @@ public abstract class UnfilteredPartitionIterators }; } - public static UnfilteredPartitionIterator removeDroppedColumns(UnfilteredPartitionIterator iterator, final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns) - { - return new FilteringPartitionIterator(iterator) - { - @Override - protected FilteringRow makeRowFilter() - { - return new FilteringRow() - { - @Override - protected boolean include(Cell cell) - { - return include(cell.column(), cell.livenessInfo().timestamp()); - } - - @Override - protected boolean include(ColumnDefinition c, DeletionTime dt) - { - return include(c, dt.markedForDeleteAt()); - } - - private boolean include(ColumnDefinition column, long timestamp) - { - CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); - return dropped == null || timestamp > dropped.droppedTime; - } - }; - } - - @Override - protected boolean shouldFilter(UnfilteredRowIterator iterator) - { - // TODO: We could have row iterators return the smallest timestamp they might return - // (which we can get from sstable stats), and ignore any dropping if that smallest - // timestamp is bigger that the biggest droppedColumns timestamp. - - // If none of the dropped columns is part of the columns that the iterator actually returns, there is nothing to do; - for (ColumnDefinition c : iterator.columns()) - if (droppedColumns.containsKey(c.name.bytes)) - return true; - - return false; - } - }; - } - public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest) { try (UnfilteredPartitionIterator iter = iterator) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index c003d6f..807741a 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -34,30 +34,12 @@ import org.apache.cassandra.utils.FBUtilities; */ public abstract class AbstractCell implements Cell { - public boolean isLive(int nowInSec) - { - return livenessInfo().isLive(nowInSec); - } - - public boolean isTombstone() - { - return livenessInfo().hasLocalDeletionTime() && !livenessInfo().hasTTL(); - } - - public boolean isExpiring() - { - return livenessInfo().hasTTL(); - } - - public void writeTo(Row.Writer writer) - { - writer.writeCell(column(), isCounterCell(), value(), livenessInfo(), path()); - } - public void digest(MessageDigest digest) { digest.update(value().duplicate()); - livenessInfo().digest(digest); + FBUtilities.updateWithLong(digest, timestamp()); + FBUtilities.updateWithInt(digest, localDeletionTime()); + FBUtilities.updateWithInt(digest, ttl()); FBUtilities.updateWithBoolean(digest, isCounterCell()); if (path() != null) path().digest(digest); @@ -67,7 +49,12 @@ public abstract class AbstractCell implements Cell { column().validateCellValue(value()); - livenessInfo().validate(); + if (ttl() < 0) + throw new MarshalException("A TTL should not be negative"); + if (localDeletionTime() < 0) + throw new MarshalException("A local deletion time should not be negative"); + if (isExpiring() && localDeletionTime() == NO_DELETION_TIME) + throw new MarshalException("Shoud not have a TTL without an associated local deletion time"); // If cell is a tombstone, it shouldn't have a value. if (isTombstone() && value().hasRemaining()) @@ -77,59 +64,58 @@ public abstract class AbstractCell implements Cell column().validateCellPath(path()); } - public int dataSize() - { - int size = value().remaining() + livenessInfo().dataSize(); - if (path() != null) - size += path().dataSize(); - return size; - - } - @Override public boolean equals(Object other) { + if (this == other) + return true; + if(!(other instanceof Cell)) return false; Cell that = (Cell)other; return this.column().equals(that.column()) && this.isCounterCell() == that.isCounterCell() + && this.timestamp() == that.timestamp() + && this.ttl() == that.ttl() + && this.localDeletionTime() == that.localDeletionTime() && Objects.equals(this.value(), that.value()) - && Objects.equals(this.livenessInfo(), that.livenessInfo()) && Objects.equals(this.path(), that.path()); } @Override public int hashCode() { - return Objects.hash(column(), isCounterCell(), value(), livenessInfo(), path()); + return Objects.hash(column(), isCounterCell(), timestamp(), ttl(), localDeletionTime(), value(), path()); } @Override public String toString() { if (isCounterCell()) - return String.format("[%s=%d ts=%d]", column().name, CounterContext.instance().total(value()), livenessInfo().timestamp()); + return String.format("[%s=%d ts=%d]", column().name, CounterContext.instance().total(value()), timestamp()); AbstractType<?> type = column().type; if (type instanceof CollectionType && type.isMultiCell()) { CollectionType ct = (CollectionType)type; - return String.format("[%s[%s]=%s info=%s]", + return String.format("[%s[%s]=%s %s]", column().name, ct.nameComparator().getString(path().get(0)), ct.valueComparator().getString(value()), - livenessInfo()); + livenessInfoString()); } - return String.format("[%s=%s info=%s]", column().name, type.getString(value()), livenessInfo()); + return String.format("[%s=%s %s]", column().name, type.getString(value()), livenessInfoString()); } - public Cell takeAlias() + private String livenessInfoString() { - // Cell is always used as an Aliasable object but as the code currently - // never need to alias a cell outside of its valid scope, we don't yet - // need that. - throw new UnsupportedOperationException(); + if (isExpiring()) + return String.format("ts=%d ttl=%d ldt=%d", timestamp(), ttl(), localDeletionTime()); + else if (isTombstone()) + return String.format("ts=%d ldt=%d", timestamp(), localDeletionTime()); + else + return String.format("ts=%d", timestamp()); } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java index d8256fc..e90e52b 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java @@ -41,6 +41,21 @@ public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMark return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; } + public boolean isBoundary() + { + return bound.isBoundary(); + } + + public boolean isOpen(boolean reversed) + { + return bound.isOpen(reversed); + } + + public boolean isClose(boolean reversed) + { + return bound.isClose(reversed); + } + public void validateData(CFMetaData metadata) { Slice.Bound bound = clustering(); @@ -56,16 +71,4 @@ public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMark { return toString(metadata); } - - protected void copyBoundTo(RangeTombstoneMarker.Writer writer) - { - for (int i = 0; i < bound.size(); i++) - writer.writeClusteringValue(bound.get(i)); - writer.writeBoundKind(bound.kind()); - } - - public Unfiltered takeAlias() - { - return this; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java deleted file mode 100644 index 03aeb88..0000000 --- a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.util.Iterator; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.SearchIterator; - -public abstract class AbstractReusableRow extends AbstractRow -{ - private CellData.ReusableCell simpleCell; - private ComplexRowDataBlock.ReusableIterator complexCells; - private DeletionTimeArray.Cursor complexDeletionCursor; - private RowDataBlock.ReusableIterator iterator; - - public AbstractReusableRow() - { - } - - protected abstract int row(); - protected abstract RowDataBlock data(); - - private CellData.ReusableCell simpleCell() - { - if (simpleCell == null) - simpleCell = SimpleRowDataBlock.reusableCell(); - return simpleCell; - } - - private ComplexRowDataBlock.ReusableIterator complexCells() - { - if (complexCells == null) - complexCells = ComplexRowDataBlock.reusableComplexCells(); - return complexCells; - } - - private DeletionTimeArray.Cursor complexDeletionCursor() - { - if (complexDeletionCursor == null) - complexDeletionCursor = ComplexRowDataBlock.complexDeletionCursor(); - return complexDeletionCursor; - } - - private RowDataBlock.ReusableIterator reusableIterator() - { - if (iterator == null) - iterator = RowDataBlock.reusableIterator(); - return iterator; - } - - public Columns columns() - { - return data().columns(); - } - - public Cell getCell(ColumnDefinition c) - { - assert !c.isComplex(); - if (data().simpleData == null) - return null; - - int idx = columns().simpleIdx(c, 0); - if (idx < 0) - return null; - - return simpleCell().setTo(data().simpleData.data, c, (row() * columns().simpleColumnCount()) + idx); - } - - public Cell getCell(ColumnDefinition c, CellPath path) - { - assert c.isComplex(); - - ComplexRowDataBlock data = data().complexData; - if (data == null) - return null; - - int idx = data.cellIdx(row(), c, path); - if (idx < 0) - return null; - - return simpleCell().setTo(data.cellData(row()), c, idx); - } - - public Iterator<Cell> getCells(ColumnDefinition c) - { - assert c.isComplex(); - return complexCells().setTo(data().complexData, row(), c); - } - - public boolean hasComplexDeletion() - { - return data().hasComplexDeletion(row()); - } - - public DeletionTime getDeletion(ColumnDefinition c) - { - assert c.isComplex(); - if (data().complexData == null) - return DeletionTime.LIVE; - - int idx = data().complexData.complexDeletionIdx(row(), c); - return idx < 0 - ? DeletionTime.LIVE - : complexDeletionCursor().setTo(data().complexData.complexDelTimes, idx); - } - - public Iterator<Cell> iterator() - { - return reusableIterator().setTo(data(), row()); - } - - public SearchIterator<ColumnDefinition, ColumnData> searchIterator() - { - return new SearchIterator<ColumnDefinition, ColumnData>() - { - private int simpleIdx = 0; - - public boolean hasNext() - { - // TODO: we can do better, but we expect users to no rely on this anyway - return true; - } - - public ColumnData next(ColumnDefinition column) - { - if (column.isComplex()) - { - // TODO: this is sub-optimal - - Iterator<Cell> cells = getCells(column); - return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column)); - } - else - { - int idx = columns().simpleIdx(column, simpleIdx); - if (idx < 0) - return null; - - Cell cell = simpleCell().setTo(data().simpleData.data, column, (row() * columns().simpleColumnCount()) + idx); - simpleIdx = idx + 1; - return cell == null ? null : new ColumnData(column, cell, null, null); - } - } - }; - } - - public Row takeAlias() - { - final Clustering clustering = clustering().takeAlias(); - final LivenessInfo info = primaryKeyLivenessInfo().takeAlias(); - final DeletionTime deletion = deletion().takeAlias(); - - final RowDataBlock data = data(); - final int row = row(); - - return new AbstractReusableRow() - { - protected RowDataBlock data() - { - return data; - } - - protected int row() - { - return row; - } - - public Clustering clustering() - { - return clustering; - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return info; - } - - public DeletionTime deletion() - { - return deletion; - } - - @Override - public Row takeAlias() - { - return this; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java index a99bc78..807d805 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -18,11 +18,11 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; import java.security.MessageDigest; -import java.util.Iterator; import java.util.Objects; +import com.google.common.collect.Iterables; + import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.CollectionType; import org.apache.cassandra.serializers.MarshalException; @@ -46,19 +46,14 @@ public abstract class AbstractRow implements Row if (primaryKeyLivenessInfo().isLive(nowInSec)) return true; - for (Cell cell : this) - if (cell.isLive(nowInSec)) - return true; - - return false; + return Iterables.any(cells(), cell -> cell.isLive(nowInSec)); } public boolean isEmpty() { - return !primaryKeyLivenessInfo().hasTimestamp() + return primaryKeyLivenessInfo().isEmpty() && deletion().isLive() - && !iterator().hasNext() - && !hasComplexDeletion(); + && !iterator().hasNext(); } public boolean isStatic() @@ -74,36 +69,8 @@ public abstract class AbstractRow implements Row deletion().digest(digest); primaryKeyLivenessInfo().digest(digest); - Iterator<ColumnDefinition> iter = columns().complexColumns(); - while (iter.hasNext()) - getDeletion(iter.next()).digest(digest); - - for (Cell cell : this) - cell.digest(digest); - } - - /** - * Copy this row to the provided writer. - * - * @param writer the row writer to write this row to. - */ - public void copyTo(Row.Writer writer) - { - Rows.writeClustering(clustering(), writer); - writer.writePartitionKeyLivenessInfo(primaryKeyLivenessInfo()); - writer.writeRowDeletion(deletion()); - - for (Cell cell : this) - cell.writeTo(writer); - - for (int i = 0; i < columns().complexColumnCount(); i++) - { - ColumnDefinition c = columns().getComplex(i); - DeletionTime dt = getDeletion(c); - if (!dt.isLive()) - writer.writeComplexDeletion(c, dt); - } - writer.endOfRow(); + for (ColumnData cd : this) + cd.digest(digest); } public void validateData(CFMetaData metadata) @@ -120,8 +87,8 @@ public abstract class AbstractRow implements Row if (deletion().localDeletionTime() < 0) throw new MarshalException("A local deletion time should not be negative"); - for (Cell cell : this) - cell.validate(); + for (ColumnData cd : this) + cd.validate(); } public String toString(CFMetaData metadata) @@ -142,33 +109,43 @@ public abstract class AbstractRow implements Row } sb.append(": ").append(clustering().toString(metadata)).append(" | "); boolean isFirst = true; - ColumnDefinition prevColumn = null; - for (Cell cell : this) + for (ColumnData cd : this) { if (isFirst) isFirst = false; else sb.append(", "); if (fullDetails) { - if (cell.column().isComplex() && !cell.column().equals(prevColumn)) + if (cd.column().isSimple()) { - DeletionTime complexDel = getDeletion(cell.column()); - if (!complexDel.isLive()) - sb.append("del(").append(cell.column().name).append(")=").append(complexDel).append(", "); + sb.append(cd); + } + else + { + ComplexColumnData complexData = (ComplexColumnData)cd; + if (!complexData.complexDeletion().isLive()) + sb.append("del(").append(cd.column().name).append(")=").append(complexData.complexDeletion()); + for (Cell cell : complexData) + sb.append(", ").append(cell); } - sb.append(cell); - prevColumn = cell.column(); } else { - sb.append(cell.column().name); - if (cell.column().type instanceof CollectionType) + if (cd.column().isSimple()) { - CollectionType ct = (CollectionType)cell.column().type; - sb.append("[").append(ct.nameComparator().getString(cell.path().get(0))).append("]"); - sb.append("=").append(ct.valueComparator().getString(cell.value())); + Cell cell = (Cell)cd; + sb.append(cell.column().name).append('=').append(cell.column().type.getString(cell.value())); } else { - sb.append("=").append(cell.column().type.getString(cell.value())); + ComplexColumnData complexData = (ComplexColumnData)cd; + CollectionType ct = (CollectionType)cd.column().type; + sb.append(cd.column().name).append("={"); + int i = 0; + for (Cell cell : complexData) + { + sb.append(i++ == 0 ? "" : ", "); + sb.append(ct.nameComparator().getString(cell.path().get(0))).append("->").append(ct.valueComparator().getString(cell.value())); + } + sb.append('}'); } } } @@ -188,22 +165,15 @@ public abstract class AbstractRow implements Row || !this.deletion().equals(that.deletion())) return false; - Iterator<Cell> thisCells = this.iterator(); - Iterator<Cell> thatCells = that.iterator(); - while (thisCells.hasNext()) - { - if (!thatCells.hasNext() || !thisCells.next().equals(thatCells.next())) - return false; - } - return !thatCells.hasNext(); + return Iterables.elementsEqual(this, that); } @Override public int hashCode() { int hash = Objects.hash(clustering(), columns(), primaryKeyLivenessInfo(), deletion()); - for (Cell cell : this) - hash += 31 * cell.hashCode(); + for (ColumnData cd : this) + hash += 31 * cd.hashCode(); return hash; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java index 5bfd1a3..b4f849a 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractUnfilteredRowIterator.java @@ -90,18 +90,4 @@ public abstract class AbstractUnfilteredRowIterator extends AbstractIterator<Unf public void close() { } - - public static boolean equal(UnfilteredRowIterator a, UnfilteredRowIterator b) - { - return Objects.equals(a.columns(), b.columns()) - && Objects.equals(a.metadata(), b.metadata()) - && Objects.equals(a.isReverseOrder(), b.isReverseOrder()) - && Objects.equals(a.partitionKey(), b.partitionKey()) - && Objects.equals(a.partitionLevelDeletion(), b.partitionLevelDeletion()) - && Objects.equals(a.staticRow(), b.staticRow()) - && Objects.equals(a.stats(), b.stats()) - && Objects.equals(a.metadata(), b.metadata()) - && Iterators.elementsEqual(a, b); - } - } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java new file mode 100644 index 0000000..a390bad --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.NoSuchElementException; + +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * Class that makes it easier to write unfiltered iterators that filter or modify + * the returned unfiltered. + * + * The methods you want to override are {@code computeNextStatic} and the {@code computeNext} methods. + * All of these methods are allowed to return a {@code null} value with the meaning of ignoring + * the entry. + */ +public abstract class AlteringUnfilteredRowIterator extends WrappingUnfilteredRowIterator +{ + private Row staticRow; + private Unfiltered next; + + protected AlteringUnfilteredRowIterator(UnfilteredRowIterator wrapped) + { + super(wrapped); + } + + protected Row computeNextStatic(Row row) + { + return row; + } + + protected Row computeNext(Row row) + { + return row; + } + + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + return marker; + } + + public Row staticRow() + { + if (staticRow == null) + { + Row row = computeNextStatic(super.staticRow()); + staticRow = row == null ? Rows.EMPTY_STATIC_ROW : row; + } + return staticRow; + } + + public boolean hasNext() + { + while (next == null && super.hasNext()) + { + Unfiltered unfiltered = super.next(); + if (unfiltered.isRow()) + { + Row row = computeNext((Row)unfiltered); + if (row != null && !row.isEmpty()) + next = row; + } + else + { + next = computeNext((RangeTombstoneMarker)unfiltered); + } + } + return next != null; + } + + public Unfiltered next() + { + if (!hasNext()) + throw new NoSuchElementException(); + + Unfiltered toReturn = next; + next = null; + return toReturn; + } +}
