http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java new file mode 100644 index 0000000..2c71cf3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -0,0 +1,770 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.security.MessageDigest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IMergeIterator; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * Static methods to work with atom iterators. + */ +public abstract class UnfilteredRowIterators +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class); + + private UnfilteredRowIterators() {} + + public interface MergeListener + { + public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions); + + public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions); + public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions); + public void onMergedCells(Cell mergedCell, Cell[] versions); + public void onRowDone(); + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions); + + public void close(); + } + + /** + * Returns a iterator that only returns rows with only live content. + * + * This is mainly used in the CQL layer when we know we don't care about deletion + * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting + * returned isn't shadowed by a tombstone). + */ + public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec) + { + return new FilteringIterator(iter, nowInSec); + + } + + /** + * Returns an iterator that is the result of merging other iterators. + */ + public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec) + { + assert !iterators.isEmpty(); + if (iterators.size() == 1) + return iterators.get(0); + + return UnfilteredRowMergeIterator.create(iterators, nowInSec, null); + } + + /** + * Returns an iterator that is the result of merging other iterators, and using + * specific MergeListener. + * + * Note that this method assumes that there is at least 2 iterators to merge. + */ + public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener) + { + assert mergeListener != null; + return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener); + } + + /** + * Returns an empty atom iterator for a given partition. + */ + public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) + { + return new UnfilteredRowIterator() + { + public CFMetaData metadata() + { + return cfm; + } + + public boolean isReverseOrder() + { + return isReverseOrder; + } + + public PartitionColumns columns() + { + return PartitionColumns.NONE; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + return DeletionTime.LIVE; + } + + public Row staticRow() + { + return Rows.EMPTY_STATIC_ROW; + } + + public RowStats stats() + { + return RowStats.NO_STATS; + } + + public boolean hasNext() + { + return false; + } + + public Unfiltered next() + { + throw new NoSuchElementException(); + } + + public void remove() + { + } + + public void close() + { + } + }; + } + + public static void digest(UnfilteredRowIterator iterator, MessageDigest digest) + { + // TODO: we're not computing digest the same way that old nodes. This + // means we'll have digest mismatches during upgrade. We should pass the messaging version of + // the node this is for (which might mean computing the digest last, and won't work + // for schema (where we announce the version through gossip to everyone)) + digest.update(iterator.partitionKey().getKey().duplicate()); + iterator.partitionLevelDeletion().digest(digest); + iterator.columns().digest(digest); + FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder()); + iterator.staticRow().digest(digest); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + ((Row) unfiltered).digest(digest); + else + ((RangeTombstoneMarker) unfiltered).digest(digest); + } + } + + /** + * Returns an iterator that concatenate two atom iterators. + * This method assumes that both iterator are from the same partition and that the atom from + * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator + * make sense). + */ + public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2) + { + assert iter1.metadata().cfId.equals(iter2.metadata().cfId) + && iter1.partitionKey().equals(iter2.partitionKey()) + && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion()) + && iter1.isReverseOrder() == iter2.isReverseOrder() + && iter1.columns().equals(iter2.columns()) + && iter1.staticRow().equals(iter2.staticRow()); + + return new AbstractUnfilteredRowIterator(iter1.metadata(), + iter1.partitionKey(), + iter1.partitionLevelDeletion(), + iter1.columns(), + iter1.staticRow(), + iter1.isReverseOrder(), + iter1.stats()) + { + protected Unfiltered computeNext() + { + if (iter1.hasNext()) + return iter1.next(); + + return iter2.hasNext() ? iter2.next() : endOfData(); + } + + @Override + public void close() + { + try + { + iter1.close(); + } + finally + { + iter2.close(); + } + } + }; + } + + public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator) + { + return new WrappingUnfilteredRowIterator(iterator) + { + private final CloningRow cloningRow = new CloningRow(); + private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size()); + + public Unfiltered next() + { + Unfiltered next = super.next(); + return next.kind() == Unfiltered.Kind.ROW + ? cloningRow.setTo((Row)next) + : clone((RangeTombstoneMarker)next); + } + + private RangeTombstoneMarker clone(RangeTombstoneMarker marker) + { + markerBuilder.reset(); + + RangeTombstone.Bound bound = marker.clustering(); + for (int i = 0; i < bound.size(); i++) + markerBuilder.writeClusteringValue(allocator.clone(bound.get(i))); + markerBuilder.writeBoundKind(bound.kind()); + if (marker.isBoundary()) + { + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; + markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime()); + } + else + { + markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime()); + } + markerBuilder.endOfMarker(); + return markerBuilder.build(); + } + + class CloningRow extends WrappingRow + { + private final CloningClustering cloningClustering = new CloningClustering(); + private final CloningCell cloningCell = new CloningCell(); + + protected Cell filterCell(Cell cell) + { + return cloningCell.setTo(cell); + } + + @Override + public Clustering clustering() + { + return cloningClustering.setTo(super.clustering()); + } + } + + class CloningClustering extends Clustering + { + private Clustering wrapped; + + public Clustering setTo(Clustering wrapped) + { + this.wrapped = wrapped; + return this; + } + + public int size() + { + return wrapped.size(); + } + + public ByteBuffer get(int i) + { + ByteBuffer value = wrapped.get(i); + return value == null ? null : allocator.clone(value); + } + + public ByteBuffer[] getRawValues() + { + throw new UnsupportedOperationException(); + } + } + + class CloningCell extends AbstractCell + { + private Cell wrapped; + + public Cell setTo(Cell wrapped) + { + this.wrapped = wrapped; + return this; + } + + public ColumnDefinition column() + { + return wrapped.column(); + } + + public boolean isCounterCell() + { + return wrapped.isCounterCell(); + } + + public ByteBuffer value() + { + return allocator.clone(wrapped.value()); + } + + public LivenessInfo livenessInfo() + { + return wrapped.livenessInfo(); + } + + public CellPath path() + { + CellPath path = wrapped.path(); + if (path == null) + return null; + + assert path.size() == 1; + return CellPath.create(allocator.clone(path.get(0))); + } + } + }; + } + + /** + * Turns the given iterator into an update. + * + * Warning: this method does not close the provided iterator, it is up to + * the caller to close it. + */ + public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator) + { + PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1); + + update.addPartitionDeletion(iterator.partitionLevelDeletion()); + + if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) + iterator.staticRow().copyTo(update.staticWriter()); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + ((Row) unfiltered).copyTo(update.writer()); + else + ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder())); + } + + return update; + } + + /** + * Validate that the data of the provided iterator is valid, that is that the values + * it contains are valid for the type they represent, and more generally that the + * infos stored are sensible. + * + * This is mainly used by scrubber to detect problems in sstables. + * + * @param iterator the partition to check. + * @param filename the name of the file the data is comming from. + * @return an iterator that returns the same data than {@code iterator} but that + * checks said data and throws a {@code CorruptedSSTableException} if it detects + * invalid data. + */ + public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename) + { + return new WrappingUnfilteredRowIterator(iterator) + { + public Unfiltered next() + { + Unfiltered next = super.next(); + try + { + next.validateData(metadata()); + return next; + } + catch (MarshalException me) + { + throw new CorruptSSTableException(me, filename); + } + } + }; + } + + /** + * Convert all expired cells to equivalent tombstones. + * <p> + * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that + * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to + * replace the expired cell by an equivalent tombstone (that has no value). + * + * @param iterator the iterator in which to conver expired cells. + * @param nowInSec the current time to use to decide if a cell is expired. + * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted + * to equivalent tombstones. + */ + public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec) + { + return new FilteringRowIterator(iterator) + { + protected FilteringRow makeRowFilter() + { + return new FilteringRow() + { + @Override + protected Cell filterCell(Cell cell) + { + Cell filtered = super.filterCell(cell); + if (filtered == null) + return null; + + LivenessInfo info = filtered.livenessInfo(); + if (info.hasTTL() && !filtered.isLive(nowInSec)) + { + // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring + // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility + // to repair. See discussion at + // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html + return Cells.create(filtered.column(), + filtered.isCounterCell(), + ByteBufferUtil.EMPTY_BYTE_BUFFER, + SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()), + filtered.path()); + } + else + { + return filtered; + } + } + }; + } + }; + } + + /** + * Wraps the provided iterator so it logs the returned atoms for debugging purposes. + * <p> + * Note that this is only meant for debugging as this can log a very large amount of + * logging at INFO. + */ + public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails) + { + CFMetaData metadata = iterator.metadata(); + logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}", + id, + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), + iterator.isReverseOrder(), + iterator.partitionLevelDeletion().markedForDeleteAt()); + + return new WrappingUnfilteredRowIterator(iterator) + { + @Override + public Row staticRow() + { + Row row = super.staticRow(); + if (!row.isEmpty()) + logger.info("[{}] {}", id, row.toString(metadata(), fullDetails)); + return row; + } + + @Override + public Unfiltered next() + { + Unfiltered next = super.next(); + if (next.kind() == Unfiltered.Kind.ROW) + logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails)); + else + logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata())); + return next; + } + }; + } + + /** + * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface. + */ + private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator + { + private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator; + private final MergeListener listener; + + private UnfilteredRowMergeIterator(CFMetaData metadata, + List<UnfilteredRowIterator> iterators, + PartitionColumns columns, + DeletionTime partitionDeletion, + int nowInSec, + boolean reversed, + MergeListener listener) + { + super(metadata, + iterators.get(0).partitionKey(), + partitionDeletion, + columns, + mergeStaticRows(metadata, iterators, columns.statics, nowInSec, listener, partitionDeletion), + reversed, + mergeStats(iterators)); + + this.listener = listener; + this.mergeIterator = MergeIterator.get(iterators, + reversed ? metadata.comparator.reversed() : metadata.comparator, + new MergeReducer(metadata, iterators.size(), reversed, nowInSec)); + } + + private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener) + { + try + { + checkForInvalidInput(iterators); + return new UnfilteredRowMergeIterator(iterators.get(0).metadata(), + iterators, + collectColumns(iterators), + collectPartitionLevelDeletion(iterators, listener), + nowInSec, + iterators.get(0).isReverseOrder(), + listener); + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + throw e; + } + } + + @SuppressWarnings("resource") // We're not really creating any resource here + private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators) + { + if (iterators.isEmpty()) + return; + + UnfilteredRowIterator first = iterators.get(0); + for (int i = 1; i < iterators.size(); i++) + { + UnfilteredRowIterator iter = iterators.get(i); + assert first.metadata().cfId.equals(iter.metadata().cfId); + assert first.partitionKey().equals(iter.partitionKey()); + assert first.isReverseOrder() == iter.isReverseOrder(); + } + } + + @SuppressWarnings("resource") // We're not really creating any resource here + private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener) + { + DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()]; + + DeletionTime delTime = DeletionTime.LIVE; + for (int i = 0; i < iterators.size(); i++) + { + UnfilteredRowIterator iter = iterators.get(i); + DeletionTime iterDeletion = iter.partitionLevelDeletion(); + if (listener != null) + versions[i] = iterDeletion; + if (!delTime.supersedes(iterDeletion)) + delTime = iterDeletion; + } + if (listener != null && !delTime.isLive()) + listener.onMergePartitionLevelDeletion(delTime, versions); + return delTime; + } + + private static Row mergeStaticRows(CFMetaData metadata, + List<UnfilteredRowIterator> iterators, + Columns columns, + int nowInSec, + MergeListener listener, + DeletionTime partitionDeletion) + { + if (columns.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener); + for (int i = 0; i < iterators.size(); i++) + merger.add(i, iterators.get(i).staticRow()); + + // Note that we should call 'takeAlias' on the result in theory, but we know that we + // won't reuse the merger and so that it's ok not to. + Row merged = merger.merge(partitionDeletion); + return merged == null ? Rows.EMPTY_STATIC_ROW : merged; + } + + private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators) + { + PartitionColumns first = iterators.get(0).columns(); + Columns statics = first.statics; + Columns regulars = first.regulars; + for (int i = 1; i < iterators.size(); i++) + { + PartitionColumns cols = iterators.get(i).columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return statics == first.statics && regulars == first.regulars + ? first + : new PartitionColumns(statics, regulars); + } + + private static RowStats mergeStats(List<UnfilteredRowIterator> iterators) + { + RowStats stats = RowStats.NO_STATS; + for (UnfilteredRowIterator iter : iterators) + stats = stats.mergeWith(iter.stats()); + return stats; + } + + protected Unfiltered computeNext() + { + while (mergeIterator.hasNext()) + { + Unfiltered merged = mergeIterator.next(); + if (merged != null) + return merged; + } + return endOfData(); + } + + public void close() + { + // This will close the input iterators + FileUtils.closeQuietly(mergeIterator); + + if (listener != null) + listener.close(); + } + + /** + * Specific reducer for merge operations that rewrite the same reusable + * row every time. This also skip cells shadowed by range tombstones when writing. + */ + private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered> + { + private Unfiltered.Kind nextKind; + + private final Row.Merger rowMerger; + private final RangeTombstoneMarker.Merger markerMerger; + + private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec) + { + this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener); + this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener); + } + + @Override + public boolean trivialReduceIsTrivial() + { + return listener == null; + } + + public void reduce(int idx, Unfiltered current) + { + nextKind = current.kind(); + if (nextKind == Unfiltered.Kind.ROW) + rowMerger.add(idx, (Row)current); + else + markerMerger.add(idx, (RangeTombstoneMarker)current); + } + + protected Unfiltered getReduced() + { + return nextKind == Unfiltered.Kind.ROW + ? rowMerger.merge(markerMerger.activeDeletion()) + : markerMerger.merge(); + } + + protected void onKeyChange() + { + if (nextKind == Unfiltered.Kind.ROW) + rowMerger.clear(); + else + markerMerger.clear(); + } + } + } + + private static class FilteringIterator extends AbstractIterator<Row> implements RowIterator + { + private final UnfilteredRowIterator iter; + private final int nowInSec; + private final TombstoneFilteringRow filter; + + public FilteringIterator(UnfilteredRowIterator iter, int nowInSec) + { + this.iter = iter; + this.nowInSec = nowInSec; + this.filter = new TombstoneFilteringRow(nowInSec); + } + + public CFMetaData metadata() + { + return iter.metadata(); + } + + public boolean isReverseOrder() + { + return iter.isReverseOrder(); + } + + public PartitionColumns columns() + { + return iter.columns(); + } + + public DecoratedKey partitionKey() + { + return iter.partitionKey(); + } + + public Row staticRow() + { + Row row = iter.staticRow(); + return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row); + } + + protected Row computeNext() + { + while (iter.hasNext()) + { + Unfiltered next = iter.next(); + if (next.kind() != Unfiltered.Kind.ROW) + continue; + + Row row = filter.setTo((Row)next); + if (!row.isEmpty()) + return row; + } + return endOfData(); + } + + public void close() + { + iter.close(); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java new file mode 100644 index 0000000..a5a0c75 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -0,0 +1,706 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.SearchIterator; + +/** + * Serialize/deserialize a single Unfiltered for the intra-node protocol. + * + * The encode format for an unfiltered is <flags>(<row>|<marker>) where: + * + * <flags> is a byte whose bits are flags. The rightmost 1st bit is only + * set to indicate the end of the partition. The 2nd bit indicates + * whether the reminder is a range tombstone marker (otherwise it's a row). + * If it's a row then the 3rd bit indicates if it's static, the 4th bit + * indicates the presence of a row timestamp, the 5th the presence of a row + * ttl, the 6th the presence of row deletion and the 7th indicates the + * presence of complex deletion times. + * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where + * <clustering> is the row clustering as serialized by + * {@code Clustering.serializer}. Note that static row are an exception and + * don't have this. <timestamp>, <ttl> and <deletion> are the row timestamp, ttl and deletion + * whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the + * complex ones. There is actually 2 slightly different possible layout for those + * cell: a dense one and a sparse one. Which one is used depends on the serialization + * header and more precisely of {@link SerializationHeader.useSparseColumnLayout()}: + * 1) in the dense layout, there will be as many <sci> and <ccj> as there is columns + * in the serialization header. *Each simple column <sci> will simply be a <cell> + * (which might have no value, see below), while each <ccj> will be + * [<delTime>]<cell1>...<celln><emptyCell> where <delTime> is the deletion for + * this complex column (if flags indicates it present), <celln> are the <cell> + * for this complex column and <emptyCell> is a last cell that will have no value + * to indicate the end of this column. + * 2) in the sparse layout, there won't be "empty" cells, i.e. only the column that + * actually have a cell are represented. For that, each <sci> and <ccj> start + * by a 2 byte index that points to the column in the header it belongs to. After + * that, each <sci> and <ccj> is the same than for the dense layout. But contrarily + * to the dense layout we won't know how many elements are serialized so a 2 byte + * marker with a value of -1 will indicates the end of the row. + * <marker> is <bound><deletion> where <bound> is the marker bound as serialized + * by {@code Slice.Bound.serializer} and <deletion> is the marker deletion + * time. + * + * <cell> A cell start with a 1 byte <flag>. Thre rightmost 1st bit indicates + * if there is actually a value for this cell. If this flag is unset, + * nothing more follows for the cell. The 2nd and third flag indicates if + * it's a deleted or expiring cell. The 4th flag indicates if the value + * is empty or not. The 5th and 6th indicates if the timestamp and ttl/ + * localDeletionTime for the cell are the same than the row one (if that + * is the case, those are not repeated for the cell).Follows the <value> + * (unless it's marked empty in the flag) and a delta-encoded long <timestamp> + * (unless the flag tells to use the row level one). + * Then if it's a deleted or expiring cell a delta-encoded int <localDelTime> + * and if it's expiring a delta-encoded int <ttl> (unless it's an expiring cell + * and the ttl and localDeletionTime are indicated by the flags to be the same + * than the row ones, in which case none of those appears). + */ +public class UnfilteredSerializer +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredSerializer.class); + + public static final UnfilteredSerializer serializer = new UnfilteredSerializer(); + + // Unfiltered flags + private final static int END_OF_PARTITION = 0x01; + private final static int IS_MARKER = 0x02; + // For rows + private final static int IS_STATIC = 0x04; + private final static int HAS_TIMESTAMP = 0x08; + private final static int HAS_TTL = 0x10; + private final static int HAS_DELETION = 0x20; + private final static int HAS_COMPLEX_DELETION = 0x40; + + // Cell flags + private final static int PRESENCE_MASK = 0x01; + private final static int DELETION_MASK = 0x02; + private final static int EXPIRATION_MASK = 0x04; + private final static int EMPTY_VALUE_MASK = 0x08; + private final static int USE_ROW_TIMESTAMP = 0x10; + private final static int USE_ROW_TTL = 0x20; + + public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version) + throws IOException + { + if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + serialize((RangeTombstoneMarker) unfiltered, header, out, version); + } + else + { + serialize((Row) unfiltered, header, out, version); + } + } + + public void serialize(Row row, SerializationHeader header, DataOutputPlus out, int version) + throws IOException + { + int flags = 0; + boolean isStatic = row.isStatic(); + + LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); + DeletionTime deletion = row.deletion(); + boolean hasComplexDeletion = row.hasComplexDeletion(); + + if (isStatic) + flags |= IS_STATIC; + if (pkLiveness.hasTimestamp()) + flags |= HAS_TIMESTAMP; + if (pkLiveness.hasTTL()) + flags |= HAS_TTL; + if (!deletion.isLive()) + flags |= HAS_DELETION; + if (hasComplexDeletion) + flags |= HAS_COMPLEX_DELETION; + + out.writeByte((byte)flags); + if (!isStatic) + Clustering.serializer.serialize(row.clustering(), out, version, header.clusteringTypes()); + + if ((flags & HAS_TIMESTAMP) != 0) + out.writeLong(header.encodeTimestamp(pkLiveness.timestamp())); + if ((flags & HAS_TTL) != 0) + { + out.writeInt(header.encodeTTL(pkLiveness.ttl())); + out.writeInt(header.encodeDeletionTime(pkLiveness.localDeletionTime())); + } + if ((flags & HAS_DELETION) != 0) + UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out); + + Columns columns = header.columns(isStatic); + int simpleCount = columns.simpleColumnCount(); + boolean useSparse = header.useSparseColumnLayout(isStatic); + SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator(); + + for (int i = 0; i < simpleCount; i++) + writeSimpleColumn(i, cells.next(columns.getSimple(i)), header, out, pkLiveness, useSparse); + + for (int i = simpleCount; i < columns.columnCount(); i++) + writeComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, out, pkLiveness, useSparse); + + if (useSparse) + out.writeShort(-1); + } + + private void writeSimpleColumn(int idx, ColumnData data, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse) + throws IOException + { + if (useSparse) + { + if (data == null) + return; + + out.writeShort(idx); + } + + writeCell(data == null ? null : data.cell(), header, out, rowLiveness); + } + + private void writeComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse) + throws IOException + { + Iterator<Cell> cells = data == null ? null : data.cells(); + DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion(); + + if (useSparse) + { + assert hasComplexDeletion || deletion.isLive(); + if (cells == null && deletion.isLive()) + return; + + out.writeShort(idx); + } + + if (hasComplexDeletion) + UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out); + + if (cells != null) + while (cells.hasNext()) + writeCell(cells.next(), header, out, rowLiveness); + + writeCell(null, header, out, rowLiveness); + } + + public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version) + throws IOException + { + out.writeByte((byte)IS_MARKER); + RangeTombstone.Bound.serializer.serialize(marker.clustering(), out, version, header.clusteringTypes()); + + if (marker.isBoundary()) + { + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; + UnfilteredRowIteratorSerializer.writeDelTime(bm.endDeletionTime(), header, out); + UnfilteredRowIteratorSerializer.writeDelTime(bm.startDeletionTime(), header, out); + } + else + { + UnfilteredRowIteratorSerializer.writeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime(), header, out); + } + } + + public long serializedSize(Unfiltered unfiltered, SerializationHeader header, int version, TypeSizes sizes) + { + return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER + ? serializedSize((RangeTombstoneMarker) unfiltered, header, version, sizes) + : serializedSize((Row) unfiltered, header, version, sizes); + } + + public long serializedSize(Row row, SerializationHeader header, int version, TypeSizes sizes) + { + long size = 1; // flags + + boolean isStatic = row.isStatic(); + LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); + DeletionTime deletion = row.deletion(); + boolean hasComplexDeletion = row.hasComplexDeletion(); + + if (!isStatic) + size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes(), sizes); + + if (pkLiveness.hasTimestamp()) + size += sizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp())); + if (pkLiveness.hasTTL()) + { + size += sizes.sizeof(header.encodeTTL(pkLiveness.ttl())); + size += sizes.sizeof(header.encodeDeletionTime(pkLiveness.localDeletionTime())); + } + if (!deletion.isLive()) + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes); + + Columns columns = header.columns(isStatic); + int simpleCount = columns.simpleColumnCount(); + boolean useSparse = header.useSparseColumnLayout(isStatic); + SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator(); + + for (int i = 0; i < simpleCount; i++) + size += sizeOfSimpleColumn(i, cells.next(columns.getSimple(i)), header, sizes, pkLiveness, useSparse); + + for (int i = simpleCount; i < columns.columnCount(); i++) + size += sizeOfComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, sizes, pkLiveness, useSparse); + + if (useSparse) + size += sizes.sizeof((short)-1); + + return size; + } + + private long sizeOfSimpleColumn(int idx, ColumnData data, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse) + { + long size = 0; + if (useSparse) + { + if (data == null) + return size; + + size += sizes.sizeof((short)idx); + } + return size + sizeOfCell(data == null ? null : data.cell(), header, sizes, rowLiveness); + } + + private long sizeOfComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness, boolean useSparse) + { + long size = 0; + Iterator<Cell> cells = data == null ? null : data.cells(); + DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion(); + if (useSparse) + { + assert hasComplexDeletion || deletion.isLive(); + if (cells == null && deletion.isLive()) + return size; + + size += sizes.sizeof((short)idx); + } + + if (hasComplexDeletion) + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header, sizes); + + if (cells != null) + while (cells.hasNext()) + size += sizeOfCell(cells.next(), header, sizes, rowLiveness); + + return size + sizeOfCell(null, header, sizes, rowLiveness); + } + + public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version, TypeSizes sizes) + { + long size = 1 // flags + + RangeTombstone.Bound.serializer.serializedSize(marker.clustering(), version, header.clusteringTypes(), sizes); + + if (marker.isBoundary()) + { + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.endDeletionTime(), header, sizes); + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(bm.startDeletionTime(), header, sizes); + } + else + { + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(((RangeTombstoneBoundMarker)marker).deletionTime(), header, sizes); + } + return size; + } + + public void writeEndOfPartition(DataOutputPlus out) throws IOException + { + out.writeByte((byte)1); + } + + public long serializedSizeEndOfPartition(TypeSizes sizes) + { + return 1; + } + + public Unfiltered.Kind deserialize(DataInput in, + SerializationHeader header, + SerializationHelper helper, + Row.Writer rowWriter, + RangeTombstoneMarker.Writer markerWriter) + throws IOException + { + int flags = in.readUnsignedByte(); + if (isEndOfPartition(flags)) + return null; + + if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) + { + RangeTombstone.Bound.Kind kind = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes(), markerWriter); + deserializeMarkerBody(in, header, kind.isBoundary(), markerWriter); + return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; + } + else + { + assert !isStatic(flags); // deserializeStaticRow should be used for that. + Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes(), rowWriter); + deserializeRowBody(in, header, helper, flags, rowWriter); + return Unfiltered.Kind.ROW; + } + } + + public Row deserializeStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) + throws IOException + { + int flags = in.readUnsignedByte(); + assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags); + StaticRow.Builder builder = StaticRow.builder(header.columns().statics, true, header.columns().statics.hasCounters()); + deserializeRowBody(in, header, helper, flags, builder); + return builder.build(); + } + + public void skipStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) throws IOException + { + int flags = in.readUnsignedByte(); + assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags; + skipRowBody(in, header, helper, flags); + } + + public void deserializeMarkerBody(DataInput in, + SerializationHeader header, + boolean isBoundary, + RangeTombstoneMarker.Writer writer) + throws IOException + { + if (isBoundary) + writer.writeBoundaryDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header)); + else + writer.writeBoundDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header)); + writer.endOfMarker(); + } + + public void skipMarkerBody(DataInput in, SerializationHeader header, boolean isBoundary) throws IOException + { + if (isBoundary) + { + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + } + else + { + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + } + } + + public void deserializeRowBody(DataInput in, + SerializationHeader header, + SerializationHelper helper, + int flags, + Row.Writer writer) + throws IOException + { + boolean isStatic = isStatic(flags); + boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; + boolean hasTTL = (flags & HAS_TTL) != 0; + boolean hasDeletion = (flags & HAS_DELETION) != 0; + boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0; + + long timestamp = hasTimestamp ? header.decodeTimestamp(in.readLong()) : LivenessInfo.NO_TIMESTAMP; + int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL; + int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME; + DeletionTime deletion = hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE; + + helper.writePartitionKeyLivenessInfo(writer, timestamp, ttl, localDeletionTime); + writer.writeRowDeletion(deletion); + + Columns columns = header.columns(isStatic); + if (header.useSparseColumnLayout(isStatic)) + { + int count = columns.columnCount(); + int simpleCount = columns.simpleColumnCount(); + int i; + while ((i = in.readShort()) >= 0) + { + if (i > count) + throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); + + if (i < simpleCount) + readSimpleColumn(columns.getSimple(i), in, header, helper, writer); + else + readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, writer); + } + } + else + { + for (int i = 0; i < columns.simpleColumnCount(); i++) + readSimpleColumn(columns.getSimple(i), in, header, helper, writer); + + for (int i = 0; i < columns.complexColumnCount(); i++) + readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, writer); + } + + writer.endOfRow(); + } + + private void readSimpleColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer) + throws IOException + { + if (helper.includes(column)) + readCell(column, in, header, helper, writer); + else + skipCell(column, in, header); + } + + private void readComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Writer writer) + throws IOException + { + if (helper.includes(column)) + { + helper.startOfComplexColumn(column); + + if (hasComplexDeletion) + writer.writeComplexDeletion(column, UnfilteredRowIteratorSerializer.readDelTime(in, header)); + + while (readCell(column, in, header, helper, writer)); + + helper.endOfComplexColumn(column); + } + else + { + skipComplexColumn(column, in, header, helper, hasComplexDeletion); + } + } + + public void skipRowBody(DataInput in, SerializationHeader header, SerializationHelper helper, int flags) throws IOException + { + boolean isStatic = isStatic(flags); + boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; + boolean hasTTL = (flags & HAS_TTL) != 0; + boolean hasDeletion = (flags & HAS_DELETION) != 0; + boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0; + + // Note that we don't want want to use FileUtils.skipBytesFully for anything that may not have + // the size we think due to VINT encoding + if (hasTimestamp) + in.readLong(); + if (hasTTL) + { + // ttl and localDeletionTime + in.readInt(); + in.readInt(); + } + if (hasDeletion) + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + + Columns columns = header.columns(isStatic); + if (header.useSparseColumnLayout(isStatic)) + { + int count = columns.columnCount(); + int simpleCount = columns.simpleColumnCount(); + int i; + while ((i = in.readShort()) >= 0) + { + if (i > count) + throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); + + if (i < simpleCount) + skipCell(columns.getSimple(i), in, header); + else + skipComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion); + } + } + else + { + for (int i = 0; i < columns.simpleColumnCount(); i++) + skipCell(columns.getSimple(i), in, header); + + for (int i = 0; i < columns.complexColumnCount(); i++) + skipComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion); + } + } + + private void skipComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion) + throws IOException + { + if (hasComplexDeletion) + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + + while (skipCell(column, in, header)); + } + + public static boolean isEndOfPartition(int flags) + { + return (flags & END_OF_PARTITION) != 0; + } + + public static Unfiltered.Kind kind(int flags) + { + return (flags & IS_MARKER) != 0 ? Unfiltered.Kind.RANGE_TOMBSTONE_MARKER : Unfiltered.Kind.ROW; + } + + public static boolean isStatic(int flags) + { + return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0; + } + + private void writeCell(Cell cell, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness) + throws IOException + { + if (cell == null) + { + out.writeByte((byte)0); + return; + } + + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime(); + int flags = PRESENCE_MASK; + if (!hasValue) + flags |= EMPTY_VALUE_MASK; + + if (isDeleted) + flags |= DELETION_MASK; + else if (isExpiring) + flags |= EXPIRATION_MASK; + + if (useRowTimestamp) + flags |= USE_ROW_TIMESTAMP; + if (useRowTTL) + flags |= USE_ROW_TTL; + + out.writeByte((byte)flags); + + if (hasValue) + header.getType(cell.column()).writeValue(cell.value(), out); + + if (!useRowTimestamp) + out.writeLong(header.encodeTimestamp(cell.livenessInfo().timestamp())); + + if ((isDeleted || isExpiring) && !useRowTTL) + out.writeInt(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime())); + if (isExpiring && !useRowTTL) + out.writeInt(header.encodeTTL(cell.livenessInfo().ttl())); + + if (cell.column().isComplex()) + cell.column().cellPathSerializer().serialize(cell.path(), out); + } + + private long sizeOfCell(Cell cell, SerializationHeader header, TypeSizes sizes, LivenessInfo rowLiveness) + { + long size = 1; // flags + + if (cell == null) + return size; + + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime(); + + if (hasValue) + size += header.getType(cell.column()).writtenLength(cell.value(), sizes); + + if (!useRowTimestamp) + size += sizes.sizeof(header.encodeTimestamp(cell.livenessInfo().timestamp())); + + if ((isDeleted || isExpiring) && !useRowTTL) + size += sizes.sizeof(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime())); + if (isExpiring && !useRowTTL) + size += sizes.sizeof(header.encodeTTL(cell.livenessInfo().ttl())); + + if (cell.column().isComplex()) + size += cell.column().cellPathSerializer().serializedSize(cell.path(), sizes); + + return size; + } + + private boolean readCell(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer) + throws IOException + { + int flags = in.readUnsignedByte(); + if ((flags & PRESENCE_MASK) == 0) + return false; + + boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & DELETION_MASK) != 0; + boolean isExpiring = (flags & EXPIRATION_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL) != 0; + + ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (hasValue) + { + if (helper.canSkipValue(column)) + header.getType(column).skipValue(in); + else + value = header.getType(column).readValue(in); + } + + long timestamp = useRowTimestamp ? helper.getRowTimestamp() : header.decodeTimestamp(in.readLong()); + + int localDelTime = useRowTTL + ? helper.getRowLocalDeletionTime() + : (isDeleted || isExpiring ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME); + + int ttl = useRowTTL + ? helper.getRowTTL() + : (isExpiring ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL); + + CellPath path = column.isComplex() + ? column.cellPathSerializer().deserialize(in) + : null; + + helper.writeCell(writer, column, false, value, timestamp, localDelTime, ttl, path); + + return true; + } + + private boolean skipCell(ColumnDefinition column, DataInput in, SerializationHeader header) + throws IOException + { + int flags = in.readUnsignedByte(); + if ((flags & PRESENCE_MASK) == 0) + return false; + + boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & DELETION_MASK) != 0; + boolean isExpiring = (flags & EXPIRATION_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL) != 0; + + if (hasValue) + header.getType(column).skipValue(in); + + if (!useRowTimestamp) + in.readLong(); + + if (!useRowTTL && (isDeleted || isExpiring)) + in.readInt(); + + if (!useRowTTL && isExpiring) + in.readInt(); + + if (column.isComplex()) + column.cellPathSerializer().skip(in); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRow.java b/src/java/org/apache/cassandra/db/rows/WrappingRow.java new file mode 100644 index 0000000..5a0cc78 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/WrappingRow.java @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Collections; +import java.util.Iterator; +import java.util.NoSuchElementException; + +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.SearchIterator; + +public abstract class WrappingRow extends AbstractRow +{ + protected Row wrapped; + + private final ReusableIterator cellIterator = new ReusableIterator(); + private final ReusableSearchIterator cellSearchIterator = new ReusableSearchIterator(); + + /** + * Apply some filtering/transformation on cells. This function + * can return {@code null} in which case the cell will be ignored. + */ + protected abstract Cell filterCell(Cell cell); + + protected DeletionTime filterDeletionTime(DeletionTime deletionTime) + { + return deletionTime; + } + + protected ColumnData filterColumnData(ColumnData data) + { + if (data.column().isComplex()) + { + Iterator<Cell> cells = cellIterator.setTo(data.cells()); + DeletionTime dt = filterDeletionTime(data.complexDeletion()); + return cells == null && dt.isLive() + ? null + : new ColumnData(data.column(), null, cells == null ? Collections.emptyIterator(): cells, dt); + } + else + { + Cell filtered = filterCell(data.cell()); + return filtered == null ? null : new ColumnData(data.column(), filtered, null, null); + } + } + + public WrappingRow setTo(Row row) + { + this.wrapped = row; + return this; + } + + public Unfiltered.Kind kind() + { + return Unfiltered.Kind.ROW; + } + + public Clustering clustering() + { + return wrapped.clustering(); + } + + public Columns columns() + { + return wrapped.columns(); + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return wrapped.primaryKeyLivenessInfo(); + } + + public DeletionTime deletion() + { + return wrapped.deletion(); + } + + public boolean hasComplexDeletion() + { + // Note that because cells can be filtered out/transformed through + // filterCell(), we can't rely on wrapped.hasComplexDeletion(). + for (int i = 0; i < columns().complexColumnCount(); i++) + if (!getDeletion(columns().getComplex(i)).isLive()) + return true; + return false; + } + + public Cell getCell(ColumnDefinition c) + { + Cell cell = wrapped.getCell(c); + return cell == null ? null : filterCell(cell); + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + Cell cell = wrapped.getCell(c, path); + return cell == null ? null : filterCell(cell); + } + + public Iterator<Cell> getCells(ColumnDefinition c) + { + Iterator<Cell> cells = wrapped.getCells(c); + if (cells == null) + return null; + + cellIterator.setTo(cells); + return cellIterator.hasNext() ? cellIterator : null; + } + + public DeletionTime getDeletion(ColumnDefinition c) + { + return filterDeletionTime(wrapped.getDeletion(c)); + } + + public Iterator<Cell> iterator() + { + return cellIterator.setTo(wrapped.iterator()); + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return cellSearchIterator.setTo(wrapped.searchIterator()); + } + + public Row takeAlias() + { + boolean isCounter = columns().hasCounters(); + if (isStatic()) + { + StaticRow.Builder builder = StaticRow.builder(columns(), true, isCounter); + copyTo(builder); + return builder.build(); + } + else + { + ReusableRow copy = new ReusableRow(clustering().size(), columns(), true, isCounter); + copyTo(copy.writer()); + return copy; + } + } + + private class ReusableIterator extends UnmodifiableIterator<Cell> + { + private Iterator<Cell> iter; + private Cell next; + + public ReusableIterator setTo(Iterator<Cell> iter) + { + this.iter = iter; + this.next = null; + return this; + } + + public boolean hasNext() + { + while (next == null && iter.hasNext()) + next = filterCell(iter.next()); + return next != null; + } + + public Cell next() + { + if (next == null && !hasNext()) + throw new NoSuchElementException(); + + Cell result = next; + next = null; + return result; + } + }; + + private class ReusableSearchIterator implements SearchIterator<ColumnDefinition, ColumnData> + { + private SearchIterator<ColumnDefinition, ColumnData> iter; + + public ReusableSearchIterator setTo(SearchIterator<ColumnDefinition, ColumnData> iter) + { + this.iter = iter; + return this; + } + + public boolean hasNext() + { + return iter.hasNext(); + } + + public ColumnData next(ColumnDefinition column) + { + ColumnData data = iter.next(column); + if (data == null) + return null; + + return filterColumnData(data); + } + }; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java new file mode 100644 index 0000000..8847a47 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * Abstract class to make writing atom iterators that wrap another iterator + * easier. By default, the wrapping iterator simply delegate every call to + * the wrapped iterator so concrete implementations will override some of the + * methods. + */ +public abstract class WrappingRowIterator extends UnmodifiableIterator<Row> implements RowIterator +{ + protected final RowIterator wrapped; + + protected WrappingRowIterator(RowIterator wrapped) + { + this.wrapped = wrapped; + } + + public CFMetaData metadata() + { + return wrapped.metadata(); + } + + public boolean isReverseOrder() + { + return wrapped.isReverseOrder(); + } + + public PartitionColumns columns() + { + return wrapped.columns(); + } + + public DecoratedKey partitionKey() + { + return wrapped.partitionKey(); + } + + public Row staticRow() + { + return wrapped.staticRow(); + } + + public boolean hasNext() + { + return wrapped.hasNext(); + } + + public Row next() + { + return wrapped.next(); + } + + public void close() + { + wrapped.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java new file mode 100644 index 0000000..680e502 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * Abstract class to make writing atom iterators that wrap another iterator + * easier. By default, the wrapping iterator simply delegate every call to + * the wrapped iterator so concrete implementations will override some of the + * methods. + */ +public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator +{ + protected final UnfilteredRowIterator wrapped; + + protected WrappingUnfilteredRowIterator(UnfilteredRowIterator wrapped) + { + this.wrapped = wrapped; + } + + public CFMetaData metadata() + { + return wrapped.metadata(); + } + + public PartitionColumns columns() + { + return wrapped.columns(); + } + + public boolean isReverseOrder() + { + return wrapped.isReverseOrder(); + } + + public DecoratedKey partitionKey() + { + return wrapped.partitionKey(); + } + + public DeletionTime partitionLevelDeletion() + { + return wrapped.partitionLevelDeletion(); + } + + public Row staticRow() + { + return wrapped.staticRow(); + } + + public boolean hasNext() + { + return wrapped.hasNext(); + } + + public Unfiltered next() + { + return wrapped.next(); + } + + public RowStats stats() + { + return wrapped.stats(); + } + + public void close() + { + wrapped.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/AbstractBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/AbstractBounds.java b/src/java/org/apache/cassandra/dht/AbstractBounds.java index b5ffc22..e295c68 100644 --- a/src/java/org/apache/cassandra/dht/AbstractBounds.java +++ b/src/java/org/apache/cassandra/dht/AbstractBounds.java @@ -23,7 +23,7 @@ import java.io.Serializable; import java.util.List; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.io.util.DataOutputPlus; @@ -34,8 +34,8 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria private static final long serialVersionUID = 1L; public static final IPartitionerDependentSerializer<AbstractBounds<Token>> tokenSerializer = new AbstractBoundsSerializer<Token>(Token.serializer); - public static final IPartitionerDependentSerializer<AbstractBounds<RowPosition>> rowPositionSerializer = - new AbstractBoundsSerializer<RowPosition>(RowPosition.serializer); + public static final IPartitionerDependentSerializer<AbstractBounds<PartitionPosition>> rowPositionSerializer = + new AbstractBoundsSerializer<PartitionPosition>(PartitionPosition.serializer); private enum Type { @@ -112,6 +112,9 @@ public abstract class AbstractBounds<T extends RingPosition<T>> implements Seria protected abstract String getOpeningString(); protected abstract String getClosingString(); + public abstract boolean isStartInclusive(); + public abstract boolean isEndInclusive(); + public abstract AbstractBounds<T> withNewRight(T newRight); public static class AbstractBoundsSerializer<T extends RingPosition<T>> implements IPartitionerDependentSerializer<AbstractBounds<T>> http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Bounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Bounds.java b/src/java/org/apache/cassandra/dht/Bounds.java index 4a5a701..b569349 100644 --- a/src/java/org/apache/cassandra/dht/Bounds.java +++ b/src/java/org/apache/cassandra/dht/Bounds.java @@ -20,7 +20,7 @@ package org.apache.cassandra.dht; import java.util.Collections; import java.util.List; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.utils.Pair; /** @@ -102,12 +102,22 @@ public class Bounds<T extends RingPosition<T>> extends AbstractBounds<T> return "]"; } + public boolean isStartInclusive() + { + return true; + } + + public boolean isEndInclusive() + { + return true; + } + /** * Compute a bounds of keys corresponding to a given bounds of token. */ - public static Bounds<RowPosition> makeRowBounds(Token left, Token right) + public static Bounds<PartitionPosition> makeRowBounds(Token left, Token right) { - return new Bounds<RowPosition>(left.minKeyBound(), right.maxKeyBound()); + return new Bounds<PartitionPosition>(left.minKeyBound(), right.maxKeyBound()); } public AbstractBounds<T> withNewRight(T newRight) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/ExcludingBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/ExcludingBounds.java b/src/java/org/apache/cassandra/dht/ExcludingBounds.java index 0d37e5c..86af68d 100644 --- a/src/java/org/apache/cassandra/dht/ExcludingBounds.java +++ b/src/java/org/apache/cassandra/dht/ExcludingBounds.java @@ -90,6 +90,16 @@ public class ExcludingBounds<T extends RingPosition<T>> extends AbstractBounds<T return ")"; } + public boolean isStartInclusive() + { + return false; + } + + public boolean isEndInclusive() + { + return false; + } + public AbstractBounds<T> withNewRight(T newRight) { return new ExcludingBounds<T>(left, newRight); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java index e9e8e8e..446d0af 100644 --- a/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java +++ b/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java @@ -89,6 +89,16 @@ public class IncludingExcludingBounds<T extends RingPosition<T>> extends Abstrac return ")"; } + public boolean isStartInclusive() + { + return true; + } + + public boolean isEndInclusive() + { + return false; + } + public AbstractBounds<T> withNewRight(T newRight) { return new IncludingExcludingBounds<T>(left, newRight); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Range.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index cbf093c..f99716b 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -21,7 +21,8 @@ import java.io.Serializable; import java.util.*; import org.apache.commons.lang3.ObjectUtils; -import org.apache.cassandra.db.RowPosition; + +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.utils.Pair; /** @@ -372,6 +373,16 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen return "]"; } + public boolean isStartInclusive() + { + return false; + } + + public boolean isEndInclusive() + { + return true; + } + public List<String> asList() { ArrayList<String> ret = new ArrayList<String>(2); @@ -465,12 +476,12 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen /** * Compute a range of keys corresponding to a given range of token. */ - public static Range<RowPosition> makeRowRange(Token left, Token right) + public static Range<PartitionPosition> makeRowRange(Token left, Token right) { - return new Range<RowPosition>(left.maxKeyBound(), right.maxKeyBound()); + return new Range<PartitionPosition>(left.maxKeyBound(), right.maxKeyBound()); } - public static Range<RowPosition> makeRowRange(Range<Token> tokenBounds) + public static Range<PartitionPosition> makeRowRange(Range<Token> tokenBounds) { return makeRowRange(tokenBounds.left, tokenBounds.right); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/dht/Token.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java index 0cc8a2d..c87b46b 100644 --- a/src/java/org/apache/cassandra/dht/Token.java +++ b/src/java/org/apache/cassandra/dht/Token.java @@ -22,7 +22,7 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.util.DataOutputPlus; @@ -142,7 +142,7 @@ public abstract class Token implements RingPosition<Token>, Serializable return (R)maxKeyBound(); } - public static class KeyBound implements RowPosition + public static class KeyBound implements PartitionPosition { private final Token token; public final boolean isMinimumBound; @@ -158,7 +158,7 @@ public abstract class Token implements RingPosition<Token>, Serializable return token; } - public int compareTo(RowPosition pos) + public int compareTo(PartitionPosition pos) { if (this == pos) return 0; @@ -188,9 +188,9 @@ public abstract class Token implements RingPosition<Token>, Serializable return getToken().isMinimum(); } - public RowPosition.Kind kind() + public PartitionPosition.Kind kind() { - return isMinimumBound ? RowPosition.Kind.MIN_BOUND : RowPosition.Kind.MAX_BOUND; + return isMinimumBound ? PartitionPosition.Kind.MIN_BOUND : PartitionPosition.Kind.MAX_BOUND; } @Override
