http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/MemtableRowData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/MemtableRowData.java b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java new file mode 100644 index 0000000..cad0765 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/MemtableRowData.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * Row data stored inside a memtable. + * + * This has methods like dataSize and unsharedHeapSizeExcludingData that are + * specific to memtables. + */ +public interface MemtableRowData extends Clusterable +{ + public Columns columns(); + + public int dataSize(); + + // returns the size of the Row and all references on the heap, excluding any costs associated with byte arrays + // that would be allocated by a clone operation, as these will be accounted for by the allocator + public long unsharedHeapSizeExcludingData(); + + public interface ReusableRow extends Row + { + public ReusableRow setTo(MemtableRowData rowData); + } + + public class BufferRowData implements MemtableRowData + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferRowData(null, LivenessInfo.NONE, DeletionTime.LIVE, null)); + + private final Clustering clustering; + private final LivenessInfo livenessInfo; + private final DeletionTime deletion; + private final RowDataBlock dataBlock; + + public BufferRowData(Clustering clustering, LivenessInfo livenessInfo, DeletionTime deletion, RowDataBlock dataBlock) + { + this.clustering = clustering; + this.livenessInfo = livenessInfo.takeAlias(); + this.deletion = deletion.takeAlias(); + this.dataBlock = dataBlock; + } + + public Clustering clustering() + { + return clustering; + } + + public Columns columns() + { + return dataBlock.columns(); + } + + public int dataSize() + { + return clustering.dataSize() + livenessInfo.dataSize() + deletion.dataSize() + dataBlock.dataSize(); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + + (clustering == Clustering.STATIC_CLUSTERING ? 0 : ((BufferClustering)clustering).unsharedHeapSizeExcludingData()) + + dataBlock.unsharedHeapSizeExcludingData(); + } + + public static ReusableRow createReusableRow() + { + return new BufferRow(); + } + + private static class BufferRow extends AbstractReusableRow implements ReusableRow + { + private BufferRowData rowData; + + private BufferRow() + { + } + + public ReusableRow setTo(MemtableRowData rowData) + { + assert rowData instanceof BufferRowData; + this.rowData = (BufferRowData)rowData; + return this; + } + + protected RowDataBlock data() + { + return rowData.dataBlock; + } + + protected int row() + { + return 0; + } + + public Clustering clustering() + { + return rowData.clustering; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return rowData.livenessInfo; + } + + public DeletionTime deletion() + { + return rowData.deletion; + } + } + } + + public class BufferClustering extends Clustering + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferClustering(0)); + + private final ByteBuffer[] values; + + public BufferClustering(int size) + { + this.values = new ByteBuffer[size]; + } + + public void setClusteringValue(int i, ByteBuffer value) + { + values[i] = value; + } + + public int size() + { + return values.length; + } + + public ByteBuffer get(int i) + { + return values[i]; + } + + public ByteBuffer[] getRawValues() + { + return values; + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); + } + + @Override + public long unsharedHeapSize() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values); + } + + public Clustering takeAlias() + { + return this; + } + } + + public class BufferCellPath extends CellPath.SimpleCellPath + { + private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCellPath(new ByteBuffer[0])); + + private BufferCellPath(ByteBuffer[] values) + { + super(values); + } + + public static BufferCellPath clone(CellPath path, AbstractAllocator allocator) + { + int size = path.size(); + ByteBuffer[] values = new ByteBuffer[size]; + for (int i = 0; i < size; i++) + values[i] = allocator.clone(path.get(0)); + return new BufferCellPath(values); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(values); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java new file mode 100644 index 0000000..b5ac19b --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundMarker.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * A range tombstone marker that indicates the bound of a range tombstone (start or end). + */ +public class RangeTombstoneBoundMarker extends AbstractRangeTombstoneMarker +{ + private final DeletionTime deletion; + + public RangeTombstoneBoundMarker(RangeTombstone.Bound bound, DeletionTime deletion) + { + super(bound); + assert bound.kind().isBound(); + this.deletion = deletion; + } + + public RangeTombstoneBoundMarker(Slice.Bound bound, DeletionTime deletion) + { + this(new RangeTombstone.Bound(bound.kind(), bound.getRawValues()), deletion); + } + + public static RangeTombstoneBoundMarker inclusiveStart(ClusteringPrefix clustering, DeletionTime deletion) + { + return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_START_BOUND, clustering.getRawValues()), deletion); + } + + public static RangeTombstoneBoundMarker inclusiveEnd(ClusteringPrefix clustering, DeletionTime deletion) + { + return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(RangeTombstone.Bound.Kind.INCL_END_BOUND, clustering.getRawValues()), deletion); + } + + public static RangeTombstoneBoundMarker inclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveOpen(reversed, boundValues); + return new RangeTombstoneBoundMarker(bound, deletion); + } + + public static RangeTombstoneBoundMarker exclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveOpen(reversed, boundValues); + return new RangeTombstoneBoundMarker(bound, deletion); + } + + public static RangeTombstoneBoundMarker inclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveClose(reversed, boundValues); + return new RangeTombstoneBoundMarker(bound, deletion); + } + + public static RangeTombstoneBoundMarker exclusiveClose(boolean reversed, ByteBuffer[] boundValues, DeletionTime deletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveClose(reversed, boundValues); + return new RangeTombstoneBoundMarker(bound, deletion); + } + + public boolean isBoundary() + { + return false; + } + + /** + * The deletion time for the range tombstone this is a bound of. + */ + public DeletionTime deletionTime() + { + return deletion; + } + + public boolean isOpen(boolean reversed) + { + return bound.kind().isOpen(reversed); + } + + public boolean isClose(boolean reversed) + { + return bound.kind().isClose(reversed); + } + + public DeletionTime openDeletionTime(boolean reversed) + { + if (!isOpen(reversed)) + throw new IllegalStateException(); + return deletion; + } + + public DeletionTime closeDeletionTime(boolean reversed) + { + if (isOpen(reversed)) + throw new IllegalStateException(); + return deletion; + } + + public void copyTo(RangeTombstoneMarker.Writer writer) + { + copyBoundTo(writer); + writer.writeBoundDeletion(deletion); + writer.endOfMarker(); + } + + public void digest(MessageDigest digest) + { + bound.digest(digest); + deletion.digest(digest); + } + + public String toString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + sb.append("Marker "); + sb.append(bound.toString(metadata)); + sb.append("@").append(deletion.markedForDeleteAt()); + return sb.toString(); + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof RangeTombstoneBoundMarker)) + return false; + + RangeTombstoneBoundMarker that = (RangeTombstoneBoundMarker)other; + return this.bound.equals(that.bound) + && this.deletion.equals(that.deletion); + } + + @Override + public int hashCode() + { + return Objects.hash(bound, deletion); + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java new file mode 100644 index 0000000..1140d40 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneBoundaryMarker.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * A range tombstone marker that represents a boundary between 2 range tombstones (i.e. it closes one range and open another). + */ +public class RangeTombstoneBoundaryMarker extends AbstractRangeTombstoneMarker +{ + private final DeletionTime endDeletion; + private final DeletionTime startDeletion; + + public RangeTombstoneBoundaryMarker(RangeTombstone.Bound bound, DeletionTime endDeletion, DeletionTime startDeletion) + { + super(bound); + assert bound.kind().isBoundary(); + this.endDeletion = endDeletion; + this.startDeletion = startDeletion; + } + + public static RangeTombstoneBoundaryMarker exclusiveCloseInclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.exclusiveCloseInclusiveOpen(reversed, boundValues); + DeletionTime endDeletion = reversed ? openDeletion : closeDeletion; + DeletionTime startDeletion = reversed ? closeDeletion : openDeletion; + return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion); + } + + public static RangeTombstoneBoundaryMarker inclusiveCloseExclusiveOpen(boolean reversed, ByteBuffer[] boundValues, DeletionTime closeDeletion, DeletionTime openDeletion) + { + RangeTombstone.Bound bound = RangeTombstone.Bound.inclusiveCloseExclusiveOpen(reversed, boundValues); + DeletionTime endDeletion = reversed ? openDeletion : closeDeletion; + DeletionTime startDeletion = reversed ? closeDeletion : openDeletion; + return new RangeTombstoneBoundaryMarker(bound, endDeletion, startDeletion); + } + + public boolean isBoundary() + { + return true; + } + + /** + * The deletion time for the range tombstone this boundary ends (in clustering order). + */ + public DeletionTime endDeletionTime() + { + return endDeletion; + } + + /** + * The deletion time for the range tombstone this boundary starts (in clustering order). + */ + public DeletionTime startDeletionTime() + { + return startDeletion; + } + + public DeletionTime closeDeletionTime(boolean reversed) + { + return reversed ? startDeletion : endDeletion; + } + + public DeletionTime openDeletionTime(boolean reversed) + { + return reversed ? endDeletion : startDeletion; + } + + public boolean isOpen(boolean reversed) + { + // A boundary always open one side + return true; + } + + public boolean isClose(boolean reversed) + { + // A boundary always close one side + return true; + } + + public static boolean isBoundary(ClusteringComparator comparator, Slice.Bound close, Slice.Bound open) + { + if (!comparator.isOnSameClustering(close, open)) + return false; + + // If both bound are exclusive, then it's not a boundary, otherwise it is one. + // Note that most code should never call this with 2 inclusive bound: this would mean we had + // 2 RTs that were overlapping and RangeTombstoneList don't create that. However, old + // code was generating that so supporting this case helps dealing with backward compatibility. + return close.isInclusive() || open.isInclusive(); + } + + // Please note that isBoundary *must* have been called (and returned true) before this is called. + public static RangeTombstoneBoundaryMarker makeBoundary(boolean reversed, Slice.Bound close, Slice.Bound open, DeletionTime closeDeletion, DeletionTime openDeletion) + { + boolean isExclusiveClose = close.isExclusive() || (close.isInclusive() && open.isInclusive() && openDeletion.supersedes(closeDeletion)); + return isExclusiveClose + ? exclusiveCloseInclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion) + : inclusiveCloseExclusiveOpen(reversed, close.getRawValues(), closeDeletion, openDeletion); + } + + public RangeTombstoneBoundMarker createCorrespondingCloseBound(boolean reversed) + { + return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().closeBoundOfBoundary(reversed)), endDeletion); + } + + public RangeTombstoneBoundMarker createCorrespondingOpenBound(boolean reversed) + { + return new RangeTombstoneBoundMarker(bound.withNewKind(bound.kind().openBoundOfBoundary(reversed)), startDeletion); + } + + public void copyTo(RangeTombstoneMarker.Writer writer) + { + copyBoundTo(writer); + writer.writeBoundaryDeletion(endDeletion, startDeletion); + writer.endOfMarker(); + } + + public void digest(MessageDigest digest) + { + bound.digest(digest); + endDeletion.digest(digest); + startDeletion.digest(digest); + } + + public String toString(CFMetaData metadata) + { + StringBuilder sb = new StringBuilder(); + sb.append("Marker "); + sb.append(bound.toString(metadata)); + sb.append("@").append(endDeletion.markedForDeleteAt()).append("-").append(startDeletion.markedForDeleteAt()); + return sb.toString(); + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof RangeTombstoneBoundaryMarker)) + return false; + + RangeTombstoneBoundaryMarker that = (RangeTombstoneBoundaryMarker)other; + return this.bound.equals(that.bound) + && this.endDeletion.equals(that.endDeletion) + && this.startDeletion.equals(that.startDeletion); + } + + @Override + public int hashCode() + { + return Objects.hash(bound, endDeletion, startDeletion); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java new file mode 100644 index 0000000..1a506d5 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RangeTombstoneMarker.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * A marker for a range tombstone bound. + * <p> + * There is 2 types of markers: bounds (see {@link RangeTombstoneBound}) and boundaries (see {@link RangeTombstoneBoundary}). + */ +public interface RangeTombstoneMarker extends Unfiltered +{ + @Override + public RangeTombstone.Bound clustering(); + + public boolean isBoundary(); + + public void copyTo(RangeTombstoneMarker.Writer writer); + + public boolean isOpen(boolean reversed); + public boolean isClose(boolean reversed); + public DeletionTime openDeletionTime(boolean reversed); + public DeletionTime closeDeletionTime(boolean reversed); + + public interface Writer extends Slice.Bound.Writer + { + public void writeBoundDeletion(DeletionTime deletion); + public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion); + public void endOfMarker(); + } + + public static class Builder implements Writer + { + private final ByteBuffer[] values; + private int size; + + private RangeTombstone.Bound.Kind kind; + private DeletionTime firstDeletion; + private DeletionTime secondDeletion; + + public Builder(int maxClusteringSize) + { + this.values = new ByteBuffer[maxClusteringSize]; + } + + public void writeClusteringValue(ByteBuffer value) + { + values[size++] = value; + } + + public void writeBoundKind(RangeTombstone.Bound.Kind kind) + { + this.kind = kind; + } + + public void writeBoundDeletion(DeletionTime deletion) + { + firstDeletion = deletion; + } + + public void writeBoundaryDeletion(DeletionTime endDeletion, DeletionTime startDeletion) + { + firstDeletion = endDeletion; + secondDeletion = startDeletion; + } + + public void endOfMarker() + { + } + + public RangeTombstoneMarker build() + { + assert kind != null : "Nothing has been written"; + if (kind.isBoundary()) + return new RangeTombstoneBoundaryMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion, secondDeletion); + else + return new RangeTombstoneBoundMarker(new RangeTombstone.Bound(kind, Arrays.copyOfRange(values, 0, size)), firstDeletion); + } + + public Builder reset() + { + Arrays.fill(values, null); + size = 0; + kind = null; + return this; + } + } + + /** + * Utility class to help merging range tombstone markers coming from multiple inputs (UnfilteredRowIterators). + * <p> + * The assumption that each individual input must validate and that we must preserve in the output is that every + * open marker has a corresponding close marker with the exact same deletion info, and that there is no other range + * tombstone marker between those open and close marker (of course, they could be rows in between). In other word, + * for any {@code UnfilteredRowIterator}, you only ever have to remenber the last open marker (if any) to have the + * full picture of what is deleted by range tombstones at any given point of iterating that iterator. + * <p> + * Note that this class can merge both forward and reverse iterators. To deal with reverse, we just reverse how we + * deal with open and close markers (in forward order, we'll get open-close, open-close, ..., while in reverse we'll + * get close-open, close-open, ...). + */ + public static class Merger + { + // Boundaries sorts like the bound that have their equivalent "inclusive" part and that's the main action we + // care about as far as merging goes. So MergedKind just group those as the same case, and tell us whether + // we're dealing with an open or a close (based on whether we're dealing with reversed iterators or not). + // Really this enum is just a convenience for merging. + private enum MergedKind + { + INCL_OPEN, EXCL_CLOSE, EXCL_OPEN, INCL_CLOSE; + + public static MergedKind forBound(RangeTombstone.Bound bound, boolean reversed) + { + switch (bound.kind()) + { + case INCL_START_BOUND: + case EXCL_END_INCL_START_BOUNDARY: + return reversed ? INCL_CLOSE : INCL_OPEN; + case EXCL_END_BOUND: + return reversed ? EXCL_OPEN : EXCL_CLOSE; + case EXCL_START_BOUND: + return reversed ? EXCL_CLOSE : EXCL_OPEN; + case INCL_END_EXCL_START_BOUNDARY: + case INCL_END_BOUND: + return reversed ? INCL_OPEN : INCL_CLOSE; + } + throw new AssertionError(); + } + } + + private final CFMetaData metadata; + private final UnfilteredRowIterators.MergeListener listener; + private final DeletionTime partitionDeletion; + private final boolean reversed; + + private RangeTombstone.Bound bound; + private final RangeTombstoneMarker[] markers; + + // For each iterator, what is the currently open marker deletion time (or null if there is no open marker on that iterator) + private final DeletionTime[] openMarkers; + // The index in openMarkers of the "biggest" marker, the one with the biggest deletion time. Is < 0 iff there is no open + // marker on any iterator. + private int biggestOpenMarker = -1; + + public Merger(CFMetaData metadata, int size, DeletionTime partitionDeletion, boolean reversed, UnfilteredRowIterators.MergeListener listener) + { + this.metadata = metadata; + this.listener = listener; + this.partitionDeletion = partitionDeletion; + this.reversed = reversed; + + this.markers = new RangeTombstoneMarker[size]; + this.openMarkers = new DeletionTime[size]; + } + + public void clear() + { + Arrays.fill(markers, null); + } + + public void add(int i, RangeTombstoneMarker marker) + { + bound = marker.clustering(); + markers[i] = marker; + } + + public RangeTombstoneMarker merge() + { + /* + * Merging of range tombstones works this way: + * 1) We remember what is the currently open marker in the merged stream + * 2) We update our internal states of what range is opened on the input streams based on the new markers to merge + * 3) We compute what should be the state in the merge stream after 2) + * 4) We return what marker should be issued on the merged stream based on the difference between the state from 1) and 3) + */ + + DeletionTime previousDeletionTimeInMerged = currentOpenDeletionTimeInMerged(); + + updateOpenMarkers(); + + DeletionTime newDeletionTimeInMerged = currentOpenDeletionTimeInMerged(); + if (previousDeletionTimeInMerged.equals(newDeletionTimeInMerged)) + return null; + + ByteBuffer[] values = bound.getRawValues(); + + RangeTombstoneMarker merged; + switch (MergedKind.forBound(bound, reversed)) + { + case INCL_OPEN: + merged = previousDeletionTimeInMerged.isLive() + ? RangeTombstoneBoundMarker.inclusiveOpen(reversed, values, newDeletionTimeInMerged) + : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); + break; + case EXCL_CLOSE: + merged = newDeletionTimeInMerged.isLive() + ? RangeTombstoneBoundMarker.exclusiveClose(reversed, values, previousDeletionTimeInMerged) + : RangeTombstoneBoundaryMarker.exclusiveCloseInclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); + break; + case EXCL_OPEN: + merged = previousDeletionTimeInMerged.isLive() + ? RangeTombstoneBoundMarker.exclusiveOpen(reversed, values, newDeletionTimeInMerged) + : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); + break; + case INCL_CLOSE: + merged = newDeletionTimeInMerged.isLive() + ? RangeTombstoneBoundMarker.inclusiveClose(reversed, values, previousDeletionTimeInMerged) + : RangeTombstoneBoundaryMarker.inclusiveCloseExclusiveOpen(reversed, values, previousDeletionTimeInMerged, newDeletionTimeInMerged); + break; + default: + throw new AssertionError(); + } + + if (listener != null) + listener.onMergedRangeTombstoneMarkers(merged, markers); + + return merged; + } + + private DeletionTime currentOpenDeletionTimeInMerged() + { + if (biggestOpenMarker < 0) + return DeletionTime.LIVE; + + DeletionTime biggestDeletionTime = openMarkers[biggestOpenMarker]; + // it's only open in the merged iterator if it's not shadowed by the partition level deletion + return partitionDeletion.supersedes(biggestDeletionTime) ? DeletionTime.LIVE : biggestDeletionTime.takeAlias(); + } + + private void updateOpenMarkers() + { + for (int i = 0; i < markers.length; i++) + { + RangeTombstoneMarker marker = markers[i]; + if (marker == null) + continue; + + // Note that we can have boundaries that are both open and close, but in that case all we care about + // is what it the open deletion after the marker, so we favor the opening part in this case. + if (marker.isOpen(reversed)) + openMarkers[i] = marker.openDeletionTime(reversed).takeAlias(); + else + openMarkers[i] = null; + } + + // Recompute what is now the biggest open marker + biggestOpenMarker = -1; + for (int i = 0; i < openMarkers.length; i++) + { + if (openMarkers[i] != null && (biggestOpenMarker < 0 || openMarkers[i].supersedes(openMarkers[biggestOpenMarker]))) + biggestOpenMarker = i; + } + } + + public DeletionTime activeDeletion() + { + DeletionTime openMarker = currentOpenDeletionTimeInMerged(); + // We only have an open marker in the merged stream if it's not shadowed by the partition deletion (which can be LIVE itself), so + // if have an open marker, we know it's the "active" deletion for the merged stream. + return openMarker.isLive() ? partitionDeletion : openMarker; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/ReusableRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ReusableRow.java b/src/java/org/apache/cassandra/db/rows/ReusableRow.java new file mode 100644 index 0000000..0135afc --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/ReusableRow.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.db.*; + +public class ReusableRow extends AbstractReusableRow +{ + private final ReusableClustering clustering; + + private final ReusableLivenessInfo liveness = new ReusableLivenessInfo(); + + private DeletionTime deletion = DeletionTime.LIVE; + + private final RowDataBlock data; + private final Writer writer; + + public ReusableRow(int clusteringSize, Columns columns, boolean inOrderCells, boolean isCounter) + { + this.clustering = new ReusableClustering(clusteringSize); + this.data = new RowDataBlock(columns, 1, false, isCounter); + this.writer = new Writer(data, inOrderCells); + } + + protected RowDataBlock data() + { + return data; + } + + protected int row() + { + return 0; + } + + public Clustering clustering() + { + return clustering; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return liveness; + } + + public DeletionTime deletion() + { + return deletion; + } + + public Row.Writer writer() + { + return writer.reset(); + } + + private class Writer extends RowDataBlock.Writer + { + public Writer(RowDataBlock data, boolean inOrderCells) + { + super(data, inOrderCells); + } + + public void writeClusteringValue(ByteBuffer buffer) + { + clustering.writer().writeClusteringValue(buffer); + } + + public void writePartitionKeyLivenessInfo(LivenessInfo info) + { + ReusableRow.this.liveness.setTo(info); + } + + public void writeRowDeletion(DeletionTime deletion) + { + ReusableRow.this.deletion = deletion; + } + + @Override + public Writer reset() + { + super.reset(); + clustering.reset(); + liveness.reset(); + deletion = DeletionTime.LIVE; + return this; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java new file mode 100644 index 0000000..545da7a --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.SearchIterator; + +/** + * Storage engine representation of a row. + * + * A row is identified by it's clustering column values (it's an Unfiltered), + * has row level informations (deletion and partition key liveness infos (see below)) + * and contains data (Cells) regarding the columns it contains. + * + * A row implements {@code WithLivenessInfo} and has thus a timestamp, ttl and + * local deletion time. Those information do not apply to the row content, they + * apply to the partition key columns. In other words, the timestamp is the + * timestamp for the partition key columns: it is what allows to distinguish + * between a dead row, and a live row but for which only the partition key columns + * are set. The ttl and local deletion time information are for the case where + * a TTL is set on those partition key columns. Note however that a row can have + * live cells but no partition key columns timestamp, because said timestamp (and + * its corresponding ttl) is only set on INSERT (not UPDATE). + */ +public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> +{ + /** + * The clustering values for this row. + */ + @Override + public Clustering clustering(); + + /** + * The columns this row contains. + * + * Note that this is actually a superset of the columns the row contains. The row + * may not have values for each of those columns, but it can't have values for other + * columns. + * + * @return a superset of the columns contained in this row. + */ + public Columns columns(); + + /** + * The row deletion. + * + * This correspond to the last row deletion done on this row. + * + * @return the row deletion. + */ + public DeletionTime deletion(); + + /** + * Liveness information for the primary key columns of this row. + * <p> + * As a row is uniquely identified by its primary key, all its primary key columns + * share the same {@code LivenessInfo}. This liveness information is what allows us + * to distinguish between a dead row (it has no live cells and its primary key liveness + * info has no timestamp) and a live row but where all non PK columns are null (it has no + * live cells, but its primary key liveness has a timestamp). Please note that the ttl + * (and local deletion time) of the PK liveness information only apply to the + * liveness info timestamp, and not to the content of the row. Also note that because + * in practice there is not way to only delete the primary key columns (without deleting + * the row itself), the returned {@code LivenessInfo} can only have a local deletion time + * if it has a TTL. + * <p> + * Lastly, note that it is possible for a row to have live cells but no PK liveness + * info timestamp, because said timestamp is only set on {@code INSERT} (which makes sense + * in itself, see #6782) but live cells can be add through {@code UPDATE} even if the row + * wasn't pre-existing (which users are encouraged not to do, but we can't validate). + */ + public LivenessInfo primaryKeyLivenessInfo(); + + /** + * Whether the row correspond to a static row or not. + * + * @return whether the row correspond to a static row or not. + */ + public boolean isStatic(); + + /** + * Whether the row has no information whatsoever. This means no row infos + * (timestamp, ttl, deletion), no cells and no complex deletion info. + * + * @return {@code true} if the row has no data whatsoever, {@code false} otherwise. + */ + public boolean isEmpty(); + + /** + * Whether the row has some live information (i.e. it's not just deletion informations). + */ + public boolean hasLiveData(int nowInSec); + + /** + * Whether or not this row contains any deletion for a complex column. That is if + * there is at least one column for which {@code getDeletion} returns a non + * live deletion time. + */ + public boolean hasComplexDeletion(); + + /** + * Returns a cell for a simple column. + * + * Calls to this method are allowed to return the same Cell object, and hence the returned + * object is only valid until the next getCell/getCells call on the same Row object. You will need + * to copy the returned data if you plan on using a reference to the Cell object + * longer than that. + * + * @param c the simple column for which to fetch the cell. + * @return the corresponding cell or {@code null} if the row has no such cell. + */ + public Cell getCell(ColumnDefinition c); + + /** + * Return a cell for a given complex column and cell path. + * + * Calls to this method are allowed to return the same Cell object, and hence the returned + * object is only valid until the next getCell/getCells call on the same Row object. You will need + * to copy the returned data if you plan on using a reference to the Cell object + * longer than that. + * + * @param c the complex column for which to fetch the cell. + * @param path the cell path for which to fetch the cell. + * @return the corresponding cell or {@code null} if the row has no such cell. + */ + public Cell getCell(ColumnDefinition c, CellPath path); + + /** + * Returns an iterator on the cells of a complex column c. + * + * Calls to this method are allowed to return the same iterator object, and + * hence the returned object is only valid until the next getCell/getCells call + * on the same Row object. You will need to copy the returned data if you + * plan on using a reference to the Cell object longer than that. + * + * @param c the complex column for which to fetch the cells. + * @return an iterator on the cells of complex column {@code c} or {@code null} if the row has no + * cells for that column. + */ + public Iterator<Cell> getCells(ColumnDefinition c); + + /** + * Deletion informations for complex columns. + * + * @param c the complex column for which to fetch deletion info. + * @return the deletion time for complex column {@code c} in this row. + */ + public DeletionTime getDeletion(ColumnDefinition c); + + /** + * An iterator over the cells of this row. + * + * The iterator guarantees that for 2 rows of the same partition, columns + * are returned in a consistent order in the sense that if the cells for + * column c1 is returned before the cells for column c2 by the first iterator, + * it is also the case for the 2nd iterator. + * + * The object returned by a call to next() is only guaranteed to be valid until + * the next call to hasNext() or next(). If a consumer wants to keep a + * reference on the returned Cell objects for longer than the iteration, it must + * make a copy of it explicitly. + * + * @return an iterator over the cells of this row. + */ + public Iterator<Cell> iterator(); + + /** + * An iterator to efficiently search data for a given column. + * + * @return a search iterator for the cells of this row. + */ + public SearchIterator<ColumnDefinition, ColumnData> searchIterator(); + + /** + * Copy this row to the provided writer. + * + * @param writer the row writer to write this row to. + */ + public void copyTo(Row.Writer writer); + + public String toString(CFMetaData metadata, boolean fullDetails); + + /** + * Interface for writing a row. + * <p> + * Clients of this interface should abid to the following assumptions: + * 1) if the row has a non empty clustering (it's not a static one and it doesn't belong to a table without + * clustering columns), then that clustering should be the first thing written (through + * {@link ClusteringPrefix.Writer#writeClusteringValue})). + * 2) for a given complex column, calls to {@link #writeCell} are performed consecutively (without + * any call to {@code writeCell} for another column intermingled) and in {@code CellPath} order. + * 3) {@link #endOfRow} is always called to end the writing of a given row. + */ + public interface Writer extends ClusteringPrefix.Writer + { + /** + * Writes the livness information for the partition key columns of this row. + * + * This call is optional: skipping it is equivalent to calling {@code writePartitionKeyLivenessInfo(LivenessInfo.NONE)}. + * + * @param info the liveness information for the partition key columns of the written row. + */ + public void writePartitionKeyLivenessInfo(LivenessInfo info); + + /** + * Writes the deletion information for this row. + * + * This call is optional and can be skipped if the row is not deleted. + * + * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted. + */ + public void writeRowDeletion(DeletionTime deletion); + + /** + * Writes a cell to the writer. + * + * As mentionned above, add cells for a given column should be added consecutively (and in {@code CellPath} order for complex columns). + * + * @param column the column for the written cell. + * @param isCounter whether or not this is a counter cell. + * @param value the value for the cell. For tombstones, which don't have values, this should be an empty buffer. + * @param info the cell liveness information. + * @param path the {@link CellPath} for complex cells and {@code null} for regular cells. + */ + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path); + + /** + * Writes a deletion for a complex column, that is one that apply to all cells of the complex column. + * + * @param column the (complex) column this is a deletion for. + * @param complexDeletion the deletion time. + */ + public void writeComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion); + + /** + * Should be called to indicates that the row has been fully written. + */ + public void endOfRow(); + } + + /** + * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators). + */ + public abstract static class Merger + { + private final CFMetaData metadata; + private final int nowInSec; + private final UnfilteredRowIterators.MergeListener listener; + private final Columns columns; + + private Clustering clustering; + private final Row[] rows; + private int rowsToMerge; + + private LivenessInfo rowInfo = LivenessInfo.NONE; + private DeletionTime rowDeletion = DeletionTime.LIVE; + + private final Cell[] cells; + private final List<Iterator<Cell>> complexCells; + private final ComplexColumnReducer complexReducer = new ComplexColumnReducer(); + + // For the sake of the listener if there is one + private final DeletionTime[] complexDelTimes; + + private boolean signaledListenerForRow; + + public static Merger createStatic(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + { + return new StaticMerger(metadata, size, nowInSec, columns, listener); + } + + public static Merger createRegular(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + { + return new RegularMerger(metadata, size, nowInSec, columns, listener); + } + + protected Merger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + { + this.metadata = metadata; + this.nowInSec = nowInSec; + this.listener = listener; + this.columns = columns; + this.rows = new Row[size]; + this.complexCells = new ArrayList<>(size); + + this.cells = new Cell[size]; + this.complexDelTimes = listener == null ? null : new DeletionTime[size]; + } + + public void clear() + { + Arrays.fill(rows, null); + Arrays.fill(cells, null); + if (complexDelTimes != null) + Arrays.fill(complexDelTimes, null); + complexCells.clear(); + rowsToMerge = 0; + + rowInfo = LivenessInfo.NONE; + rowDeletion = DeletionTime.LIVE; + + signaledListenerForRow = false; + } + + public void add(int i, Row row) + { + clustering = row.clustering(); + rows[i] = row; + ++rowsToMerge; + } + + protected abstract Row.Writer getWriter(); + protected abstract Row getRow(); + + public Row merge(DeletionTime activeDeletion) + { + // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out), + // then we can just return that single row (we also should have no listener) + if (rowsToMerge == 1 && activeDeletion.isLive() && listener == null) + { + for (int i = 0; i < rows.length; i++) + if (rows[i] != null) + return rows[i]; + throw new AssertionError(); + } + + Row.Writer writer = getWriter(); + Rows.writeClustering(clustering, writer); + + for (int i = 0; i < rows.length; i++) + { + if (rows[i] == null) + continue; + + rowInfo = rowInfo.mergeWith(rows[i].primaryKeyLivenessInfo()); + + if (rows[i].deletion().supersedes(rowDeletion)) + rowDeletion = rows[i].deletion(); + } + + if (rowDeletion.supersedes(activeDeletion)) + activeDeletion = rowDeletion; + + if (activeDeletion.deletes(rowInfo)) + rowInfo = LivenessInfo.NONE; + + writer.writePartitionKeyLivenessInfo(rowInfo); + writer.writeRowDeletion(rowDeletion); + + for (int i = 0; i < columns.simpleColumnCount(); i++) + { + ColumnDefinition c = columns.getSimple(i); + for (int j = 0; j < rows.length; j++) + cells[j] = rows[j] == null ? null : rows[j].getCell(c); + + reconcileCells(activeDeletion, writer); + } + + complexReducer.activeDeletion = activeDeletion; + complexReducer.writer = writer; + for (int i = 0; i < columns.complexColumnCount(); i++) + { + ColumnDefinition c = columns.getComplex(i); + + DeletionTime maxComplexDeletion = DeletionTime.LIVE; + for (int j = 0; j < rows.length; j++) + { + if (rows[j] == null) + continue; + + DeletionTime dt = rows[j].getDeletion(c); + if (complexDelTimes != null) + complexDelTimes[j] = dt; + + if (dt.supersedes(maxComplexDeletion)) + maxComplexDeletion = dt; + } + + boolean overrideActive = maxComplexDeletion.supersedes(activeDeletion); + maxComplexDeletion = overrideActive ? maxComplexDeletion : DeletionTime.LIVE; + writer.writeComplexDeletion(c, maxComplexDeletion); + if (listener != null) + listener.onMergedComplexDeletion(c, maxComplexDeletion, complexDelTimes); + + mergeComplex(overrideActive ? maxComplexDeletion : activeDeletion, c); + } + writer.endOfRow(); + + Row row = getRow(); + // Because shadowed cells are skipped, the row could be empty. In which case + // we return null (we also don't want to signal anything in that case since that + // means everything in the row was shadowed and the listener will have been signalled + // for whatever shadows it). + if (row.isEmpty()) + return null; + + maybeSignalEndOfRow(); + return row; + } + + private void maybeSignalListenerForRow() + { + if (listener != null && !signaledListenerForRow) + { + listener.onMergingRows(clustering, rowInfo, rowDeletion, rows); + signaledListenerForRow = true; + } + } + + private void maybeSignalListenerForCell(Cell merged, Cell[] versions) + { + if (listener != null) + { + maybeSignalListenerForRow(); + listener.onMergedCells(merged, versions); + } + } + + private void maybeSignalEndOfRow() + { + if (listener != null) + { + // If we haven't signaled the listener yet (we had no cells but some deletion info), do it now + maybeSignalListenerForRow(); + listener.onRowDone(); + } + } + + private void reconcileCells(DeletionTime activeDeletion, Row.Writer writer) + { + Cell reconciled = null; + for (int j = 0; j < cells.length; j++) + { + Cell cell = cells[j]; + if (cell != null && !activeDeletion.deletes(cell.livenessInfo())) + reconciled = Cells.reconcile(reconciled, cell, nowInSec); + } + + if (reconciled != null) + { + reconciled.writeTo(writer); + maybeSignalListenerForCell(reconciled, cells); + } + } + + private void mergeComplex(DeletionTime activeDeletion, ColumnDefinition c) + { + complexCells.clear(); + for (int j = 0; j < rows.length; j++) + { + Row row = rows[j]; + Iterator<Cell> iter = row == null ? null : row.getCells(c); + complexCells.add(iter == null ? Iterators.<Cell>emptyIterator() : iter); + } + + complexReducer.column = c; + complexReducer.activeDeletion = activeDeletion; + + // Note that we use the mergeIterator only to group cells to merge, but we + // write the result to the writer directly in the reducer, so all we care + // about is iterating over the result. + Iterator<Void> iter = MergeIterator.get(complexCells, c.cellComparator(), complexReducer); + while (iter.hasNext()) + iter.next(); + } + + private class ComplexColumnReducer extends MergeIterator.Reducer<Cell, Void> + { + private DeletionTime activeDeletion; + private Row.Writer writer; + private ColumnDefinition column; + + public void reduce(int idx, Cell current) + { + cells[idx] = current; + } + + protected Void getReduced() + { + reconcileCells(activeDeletion, writer); + return null; + } + + protected void onKeyChange() + { + Arrays.fill(cells, null); + } + } + + private static class StaticMerger extends Merger + { + private final StaticRow.Builder builder; + + private StaticMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + { + super(metadata, size, nowInSec, columns, listener); + this.builder = StaticRow.builder(columns, true, metadata.isCounter()); + } + + protected Row.Writer getWriter() + { + return builder; + } + + protected Row getRow() + { + return builder.build(); + } + } + + private static class RegularMerger extends Merger + { + private final ReusableRow row; + + private RegularMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + { + super(metadata, size, nowInSec, columns, listener); + this.row = new ReusableRow(metadata.clusteringColumns().size(), columns, true, metadata.isCounter()); + } + + protected Row.Writer getWriter() + { + return row.writer(); + } + + protected Row getRow() + { + return row; + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java new file mode 100644 index 0000000..51383a2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Comparator; +import java.util.Iterator; + +import com.google.common.collect.PeekingIterator; +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.db.*; + +public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered> +{ + private final ClusteringComparator clusteringComparator; + private final Comparator<Clusterable> comparator; + private final boolean reversed; + + private Iterator<Row> rowIter; + private Row nextRow; + + private Iterator<RangeTombstone> tombstoneIter; + private RangeTombstone nextTombstone; + private boolean inTombstone; + + private Unfiltered next; + + public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed) + { + this.clusteringComparator = comparator; + this.comparator = reversed ? comparator.reversed() : comparator; + this.reversed = reversed; + } + + public RowAndTombstoneMergeIterator setTo(Iterator<Row> rowIter, Iterator<RangeTombstone> tombstoneIter) + { + this.rowIter = rowIter; + this.tombstoneIter = tombstoneIter; + this.nextRow = null; + this.nextTombstone = null; + this.next = null; + this.inTombstone = false; + return this; + } + + public boolean isSet() + { + return rowIter != null; + } + + private void prepareNext() + { + if (next != null) + return; + + if (nextTombstone == null && tombstoneIter.hasNext()) + nextTombstone = tombstoneIter.next(); + if (nextRow == null && rowIter.hasNext()) + nextRow = rowIter.next(); + + if (nextTombstone == null) + { + if (nextRow == null) + return; + + next = nextRow; + nextRow = null; + } + else if (nextRow == null) + { + if (inTombstone) + { + RangeTombstone rt = nextTombstone; + nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null; + if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed))) + { + next = RangeTombstoneBoundaryMarker.makeBoundary(reversed, + rt.deletedSlice().close(reversed), + nextTombstone.deletedSlice().open(reversed), + rt.deletionTime(), + nextTombstone.deletionTime()); + } + else + { + inTombstone = false; + next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime()); + } + } + else + { + inTombstone = true; + next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime()); + } + } + else if (inTombstone) + { + if (comparator.compare(nextTombstone.deletedSlice().close(reversed), nextRow.clustering()) < 0) + { + RangeTombstone rt = nextTombstone; + nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null; + if (nextTombstone != null && RangeTombstoneBoundaryMarker.isBoundary(clusteringComparator, rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed))) + { + next = RangeTombstoneBoundaryMarker.makeBoundary(reversed, + rt.deletedSlice().close(reversed), + nextTombstone.deletedSlice().open(reversed), + rt.deletionTime(), + nextTombstone.deletionTime()); + } + else + { + inTombstone = false; + next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime()); + } + } + else + { + next = nextRow; + nextRow = null; + } + } + else + { + if (comparator.compare(nextTombstone.deletedSlice().open(reversed), nextRow.clustering()) < 0) + { + inTombstone = true; + next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime()); + } + else + { + next = nextRow; + nextRow = null; + } + } + } + + public boolean hasNext() + { + prepareNext(); + return next != null; + } + + public Unfiltered next() + { + prepareNext(); + Unfiltered toReturn = next; + next = null; + return toReturn; + } + + public Unfiltered peek() + { + prepareNext(); + return next(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowDataBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java new file mode 100644 index 0000000..b1e2b13 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.UnmodifiableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * A {@code RowDataBlock} holds data for one or more row (of a given table). More precisely, it contains + * cell data and complex deletion data (for complex columns) and allow access to this data. Please note + * however that {@code RowDataBlock} only holds the data inside the row, it does not hold the data + * pertaining to the row itself: clustering, partition key liveness info and row deletion. + * <p> + * {@code RowDataBlock} is largely an implementation detail: it is only there to be reused by + * {@link AbstractPartitionData} and every concrete row implementation. + */ +public class RowDataBlock +{ + private static final Logger logger = LoggerFactory.getLogger(RowDataBlock.class); + + private static final long EMPTY_SIZE = ObjectSizes.measure(new RowDataBlock(Columns.NONE, 0, false, false)); + + // We distinguish 2 sub-objects: SimpleRowDataBlock that contains the data for the simple columns only, + // and ComplexRowDataBlock that only contains data for complex columns. The reason for having 2 separate + // objects is that simple columns are much easier to handle since we have only a single cell per-object + // and thus having a more specialized object allow a simpler and more efficient handling. + final SimpleRowDataBlock simpleData; + final ComplexRowDataBlock complexData; + + public RowDataBlock(Columns columns, int rows, boolean sortable, boolean isCounter) + { + this.simpleData = columns.hasSimple() ? new SimpleRowDataBlock(columns, rows, isCounter) : null; + this.complexData = columns.hasComplex() ? ComplexRowDataBlock.create(columns, rows, sortable, isCounter) : null; + } + + public Columns columns() + { + if (simpleData != null) + return simpleData.columns(); + if (complexData != null) + return complexData.columns(); + return Columns.NONE; + } + + /** + * Return the cell value for a given column of a given row. + * + * @param row the row for which to return the cell value. + * @param column the column for which to return the cell value. + * @param path the cell path for which to return the cell value. Can be null for + * simple columns. + * + * @return the value of the cell of path {@code path} for {@code column} in row {@code row}, or + * {@code null} if their is no such cell. + */ + public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path) + { + if (column.isComplex()) + { + return complexData.getValue(row, column, path); + } + else + { + int idx = columns().simpleIdx(column, 0); + assert idx >= 0; + return simpleData.data.value((row * columns().simpleColumnCount()) + idx); + } + } + + /** + * Sets the cell value for a given simple column of a given row. + * + * @param row the row for which to set the cell value. + * @param column the simple column for which to set the cell value. + * @param path the cell path for which to return the cell value. Can be null for + * simple columns. + * @param value the value to set. + */ + public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value) + { + if (column.isComplex()) + { + complexData.setValue(row, column, path, value); + } + else + { + int idx = columns().simpleIdx(column, 0); + assert idx >= 0; + simpleData.data.setValue((row * columns().simpleColumnCount()) + idx, value); + } + } + + public static ReusableIterator reusableIterator() + { + return new ReusableIterator(); + } + + // Swap row i and j + public void swap(int i, int j) + { + if (simpleData != null) + simpleData.swap(i, j); + if (complexData != null) + complexData.swap(i, j); + } + + // Merge row i into j + public void merge(int i, int j, int nowInSec) + { + if (simpleData != null) + simpleData.merge(i, j, nowInSec); + if (complexData != null) + complexData.merge(i, j, nowInSec); + } + + // Move row i into j + public void move(int i, int j) + { + if (simpleData != null) + simpleData.move(i, j); + if (complexData != null) + complexData.move(i, j); + } + + public boolean hasComplexDeletion(int row) + { + return complexData != null && complexData.hasComplexDeletion(row); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + + (simpleData == null ? 0 : simpleData.unsharedHeapSizeExcludingData()) + + (complexData == null ? 0 : complexData.unsharedHeapSizeExcludingData()); + } + + public static int computeNewCapacity(int currentCapacity, int idxToSet) + { + int newCapacity = currentCapacity == 0 ? 4 : currentCapacity; + while (idxToSet >= newCapacity) + newCapacity = 1 + (newCapacity * 3) / 2; + return newCapacity; + } + + public int dataSize() + { + return (simpleData == null ? 0 : simpleData.dataSize()) + + (complexData == null ? 0 : complexData.dataSize()); + } + + public void clear() + { + if (simpleData != null) + simpleData.clear(); + if (complexData != null) + complexData.clear(); + } + + public abstract static class Writer implements Row.Writer + { + private final boolean inOrderCells; + + protected int row; + + protected SimpleRowDataBlock.CellWriter simpleWriter; + protected ComplexRowDataBlock.CellWriter complexWriter; + + protected Writer(boolean inOrderCells) + { + this.inOrderCells = inOrderCells; + } + + protected Writer(RowDataBlock data, boolean inOrderCells) + { + this(inOrderCells); + updateWriter(data); + } + + protected void updateWriter(RowDataBlock data) + { + this.simpleWriter = data.simpleData == null ? null : data.simpleData.cellWriter(inOrderCells); + this.complexWriter = data.complexData == null ? null : data.complexData.cellWriter(inOrderCells); + } + + public Writer reset() + { + row = 0; + + if (simpleWriter != null) + simpleWriter.reset(); + if (complexWriter != null) + complexWriter.reset(); + + return this; + } + + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + if (column.isComplex()) + complexWriter.addCell(column, value, info, path); + else + simpleWriter.addCell(column, value, info); + } + + public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion) + { + if (complexDeletion.isLive()) + return; + + complexWriter.setComplexDeletion(c, complexDeletion); + } + + public void endOfRow() + { + ++row; + if (simpleWriter != null) + simpleWriter.endOfRow(); + if (complexWriter != null) + complexWriter.endOfRow(); + } + } + + static class ReusableIterator extends UnmodifiableIterator<Cell> implements Iterator<Cell> + { + private SimpleRowDataBlock.ReusableIterator simpleIterator; + private ComplexRowDataBlock.ReusableIterator complexIterator; + + public ReusableIterator() + { + this.simpleIterator = SimpleRowDataBlock.reusableIterator(); + this.complexIterator = ComplexRowDataBlock.reusableIterator(); + } + + public ReusableIterator setTo(RowDataBlock dataBlock, int row) + { + simpleIterator.setTo(dataBlock.simpleData, row); + complexIterator.setTo(dataBlock.complexData, row); + return this; + } + + public boolean hasNext() + { + return simpleIterator.hasNext() || complexIterator.hasNext(); + } + + public Cell next() + { + return simpleIterator.hasNext() ? simpleIterator.next() : complexIterator.next(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java b/src/java/org/apache/cassandra/db/rows/RowIterator.java new file mode 100644 index 0000000..69994dd --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * An iterator over rows belonging to a partition. + * + * A RowIterator is an UnfilteredRowIterator to which any deletion information has been + * filtered out. As such, all cell of all rows returned by this iterator are, + * by definition, live, and hence code using a RowIterator don't have to worry + * about tombstones and other deletion information. + * + * Note that as for UnfilteredRowIterator, the rows returned must be in clustering order (or + * reverse clustering order if isReverseOrder is true), and the Row objects returned + * by next() are only valid until the next call to hasNext() or next(). + */ +public interface RowIterator extends Iterator<Row>, AutoCloseable +{ + /** + * The metadata for the table this iterator on. + */ + public CFMetaData metadata(); + + /** + * Whether or not the rows returned by this iterator are in reversed + * clustering order. + */ + public boolean isReverseOrder(); + + /** + * A subset of the columns for the (static and regular) rows returned by this iterator. + * Every row returned by this iterator must guarantee that it has only those columns. + */ + public PartitionColumns columns(); + + /** + * The partition key of the partition this in an iterator over. + */ + public DecoratedKey partitionKey(); + + /** + * The static part corresponding to this partition (this can be an empty + * row). + */ + public Row staticRow(); + + public void close(); + + /** + * Returns whether the provided iterator has no data. + */ + public default boolean isEmpty() + { + return staticRow().isEmpty() && !hasNext(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java new file mode 100644 index 0000000..a3bd913 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.*; +import java.security.MessageDigest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Static methods to work with row iterators. + */ +public abstract class RowIterators +{ + private static final Logger logger = LoggerFactory.getLogger(RowIterators.class); + + private RowIterators() {} + + public static PartitionUpdate toUpdate(RowIterator iterator) + { + PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1); + + if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) + iterator.staticRow().copyTo(update.staticWriter()); + + while (iterator.hasNext()) + iterator.next().copyTo(update.writer()); + + return update; + } + + public static void digest(RowIterator iterator, MessageDigest digest) + { + // TODO: we're not computing digest the same way that old nodes so we'll need + // to pass the version we're computing the digest for and deal with that. + digest.update(iterator.partitionKey().getKey().duplicate()); + iterator.columns().digest(digest); + FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder()); + iterator.staticRow().digest(digest); + + while (iterator.hasNext()) + iterator.next().digest(digest); + } + + public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) + { + return new RowIterator() + { + public CFMetaData metadata() + { + return cfm; + } + + public boolean isReverseOrder() + { + return isReverseOrder; + } + + public PartitionColumns columns() + { + return PartitionColumns.NONE; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public Row staticRow() + { + return Rows.EMPTY_STATIC_ROW; + } + + public boolean hasNext() + { + return false; + } + + public Row next() + { + throw new NoSuchElementException(); + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + } + }; + } + + /** + * Wraps the provided iterator so it logs the returned rows for debugging purposes. + * <p> + * Note that this is only meant for debugging as this can log a very large amount of + * logging at INFO. + */ + public static RowIterator loggingIterator(RowIterator iterator, final String id) + { + CFMetaData metadata = iterator.metadata(); + logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}", + new Object[]{ id, + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), + iterator.isReverseOrder() }); + + return new WrappingRowIterator(iterator) + { + @Override + public Row staticRow() + { + Row row = super.staticRow(); + if (!row.isEmpty()) + logger.info("[{}] {}", id, row.toString(metadata())); + return row; + } + + @Override + public Row next() + { + Row next = super.next(); + logger.info("[{}] {}", id, next.toString(metadata())); + return next; + } + }; + } +}
