http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java new file mode 100644 index 0000000..ca1e424 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -0,0 +1,764 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Sorting; + +/** + * Stores updates made on a partition. + * <p> + * A PartitionUpdate object requires that all writes are performed before we + * try to read the updates (attempts to write to the PartitionUpdate after a + * read method has been called will result in an exception being thrown). + * In other words, a Partition is mutable while we do a write and become + * immutable as soon as it is read. + * <p> + * Row updates are added to this update through the {@link #writer} method which + * returns a {@link Row.Writer}. Multiple rows can be added to this writer as required and + * those row do not have to added in (clustering) order, and the same row can be added + * multiple times. Further, for a given row, the writer actually supports intermingling + * the writing of cells for different complex cells (note that this is usually not supported + * by {@code Row.Writer} implementations, but is supported here because + * {@code ModificationStatement} requires that (because we could have multiple {@link Operation} + * on the same column in a given statement)). + */ +public class PartitionUpdate extends AbstractPartitionData implements Sorting.Sortable +{ + protected static final Logger logger = LoggerFactory.getLogger(PartitionUpdate.class); + + // Records whether the partition update has been sorted (it is the rows contained in the partition + // that are sorted since we don't require rows to be added in order). Sorting happens when the + // update is read, and writting is rejected as soon as the update is sorted (it's actually possible + // to manually allow new update by using allowNewUpdates(), and we could make that more implicit, but + // as only triggers really requires it, we keep it simple for now). + private boolean isSorted; + + public static final PartitionUpdateSerializer serializer = new PartitionUpdateSerializer(); + + private final Writer writer; + + // Used by compare for the sake of implementing the Sorting.Sortable interface (which is in turn used + // to sort the rows of this update). + private final InternalReusableClustering p1 = new InternalReusableClustering(); + private final InternalReusableClustering p2 = new InternalReusableClustering(); + + private PartitionUpdate(CFMetaData metadata, + DecoratedKey key, + DeletionInfo delInfo, + RowDataBlock data, + PartitionColumns columns, + int initialRowCapacity) + { + super(metadata, key, delInfo, columns, data, initialRowCapacity); + this.writer = createWriter(); + } + + public PartitionUpdate(CFMetaData metadata, + DecoratedKey key, + DeletionInfo delInfo, + PartitionColumns columns, + int initialRowCapacity) + { + this(metadata, + key, + delInfo, + new RowDataBlock(columns.regulars, initialRowCapacity, true, metadata.isCounter()), + columns, + initialRowCapacity); + } + + public PartitionUpdate(CFMetaData metadata, + DecoratedKey key, + PartitionColumns columns, + int initialRowCapacity) + { + this(metadata, + key, + DeletionInfo.live(), + columns, + initialRowCapacity); + } + + protected Writer createWriter() + { + return new RegularWriter(); + } + + protected StaticWriter createStaticWriter() + { + return new StaticWriter(); + } + + /** + * Deserialize a partition update from a provided byte buffer. + * + * @param bytes the byte buffer that contains the serialized update. + * @param version the version with which the update is serialized. + * @param key the partition key for the update. This is only used if {@code version < 3.0} + * and can be {@code null} otherwise. + * + * @return the deserialized update or {@code null} if {@code bytes == null}. + */ + public static PartitionUpdate fromBytes(ByteBuffer bytes, int version, DecoratedKey key) + { + if (bytes == null) + return null; + + try + { + return serializer.deserialize(new DataInputStream(ByteBufferUtil.inputStream(bytes)), + version, + SerializationHelper.Flag.LOCAL, + version < MessagingService.VERSION_30 ? key : null); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Serialize a partition update as a byte buffer. + * + * @param update the partition update to serialize. + * @param version the version to serialize the update into. + * + * @return a newly allocated byte buffer containing the serialized update. + */ + public static ByteBuffer toBytes(PartitionUpdate update, int version) + { + try (DataOutputBuffer out = new DataOutputBuffer()) + { + serializer.serialize(update, out, MessagingService.current_version); + return ByteBuffer.wrap(out.getData(), 0, out.getLength()); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + + /** + * Creates a empty immutable partition update. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the created update. + * + * @return the newly created empty (and immutable) update. + */ + public static PartitionUpdate emptyUpdate(CFMetaData metadata, DecoratedKey key) + { + return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 0) + { + public Row.Writer staticWriter() + { + throw new UnsupportedOperationException(); + } + + public Row.Writer writer() + { + throw new UnsupportedOperationException(); + } + + public void addPartitionDeletion(DeletionTime deletionTime) + { + throw new UnsupportedOperationException(); + } + + public void addRangeTombstone(RangeTombstone range) + { + throw new UnsupportedOperationException(); + } + }; + } + + /** + * Creates a partition update that entirely deletes a given partition. + * + * @param metadata the metadata for the created update. + * @param key the partition key for the partition that the created update should delete. + * @param timestamp the timestamp for the deletion. + * @param nowInSec the current time in seconds to use as local deletion time for the partition deletion. + * + * @return the newly created partition deletion update. + */ + public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, DecoratedKey key, long timestamp, int nowInSec) + { + return new PartitionUpdate(metadata, + key, + new DeletionInfo(timestamp, nowInSec), + new RowDataBlock(Columns.NONE, 0, true, metadata.isCounter()), + PartitionColumns.NONE, + 0); + } + + /** + * Merges the provided updates, yielding a new update that incorporates all those updates. + * + * @param updates the collection of updates to merge. This shouldn't be empty. + * + * @return a partition update that include (merge) all the updates from {@code updates}. + */ + public static PartitionUpdate merge(Collection<PartitionUpdate> updates) + { + assert !updates.isEmpty(); + if (updates.size() == 1) + return Iterables.getOnlyElement(updates); + + int totalSize = 0; + PartitionColumns.Builder builder = PartitionColumns.builder(); + DecoratedKey key = null; + CFMetaData metadata = null; + for (PartitionUpdate update : updates) + { + totalSize += update.rows; + builder.addAll(update.columns()); + + if (key == null) + key = update.partitionKey(); + else + assert key.equals(update.partitionKey()); + + if (metadata == null) + metadata = update.metadata(); + else + assert metadata.cfId.equals(update.metadata().cfId); + } + + // Used when merging row to decide of liveness + int nowInSec = FBUtilities.nowInSeconds(); + PartitionUpdate newUpdate = new PartitionUpdate(metadata, key, builder.build(), totalSize); + for (PartitionUpdate update : updates) + { + newUpdate.deletionInfo.add(update.deletionInfo); + if (!update.staticRow().isEmpty()) + { + if (newUpdate.staticRow().isEmpty()) + newUpdate.staticRow = update.staticRow().takeAlias(); + else + Rows.merge(newUpdate.staticRow(), update.staticRow(), newUpdate.columns().statics, newUpdate.staticWriter(), nowInSec, SecondaryIndexManager.nullUpdater); + } + for (Row row : update) + row.copyTo(newUpdate.writer); + } + return newUpdate; + } + + /** + * The number of "operations" contained in the update. + * <p> + * This is used by {@code Memtable} to approximate how much work this update does. In practice, this + * count how many rows are updated and how many ranges are deleted by the partition update. + * + * @return the number of "operations" performed by the update. + */ + public int operationCount() + { + return rowCount() + + deletionInfo.rangeCount() + + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1); + } + + /** + * The size of the data contained in this update. + * + * @return the size of the data contained in this update. + */ + public int dataSize() + { + int clusteringSize = metadata().comparator.size(); + int size = 0; + for (Row row : this) + { + size += row.clustering().dataSize(); + for (Cell cell : row) + size += cell.dataSize(); + } + return size; + } + + /** + * If a partition update has been read (and is thus unmodifiable), a call to this method + * makes the update modifiable again. + * <p> + * Please note that calling this method won't result in optimal behavior in the sense that + * even if very little is added to the update after this call, the whole update will be sorted + * again on read. This should thus be used sparingly (and if it turns that we end up using + * this often, we should consider optimizing the behavior). + */ + public synchronized void allowNewUpdates() + { + // This is synchronized to make extra sure things work properly even if this is + // called concurrently with sort() (which should be avoided in the first place, but + // better safe than sorry). + isSorted = false; + } + + /** + * Returns an iterator that iterators over the rows of this update in clustering order. + * <p> + * Note that this might trigger a sorting of the update, and as such the update will not + * be modifiable anymore after this call. + * + * @return an iterator over the rows of this update. + */ + @Override + public Iterator<Row> iterator() + { + maybeSort(); + return super.iterator(); + } + + @Override + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) + { + maybeSort(); + return super.sliceableUnfilteredIterator(columns, reversed); + } + + /** + * Validates the data contained in this update. + * + * @throws MarshalException if some of the data contained in this update is corrupted. + */ + public void validate() + { + for (Row row : this) + { + metadata().comparator.validate(row.clustering()); + for (Cell cell : row) + cell.validate(); + } + } + + /** + * The maximum timestamp used in this update. + * + * @return the maximum timestamp used in this update. + */ + public long maxTimestamp() + { + return maxTimestamp; + } + + /** + * For an update on a counter table, returns a list containing a {@code CounterMark} for + * every counter contained in the update. + * + * @return a list with counter marks for every counter in this update. + */ + public List<CounterMark> collectCounterMarks() + { + assert metadata().isCounter(); + + InternalReusableClustering clustering = new InternalReusableClustering(); + List<CounterMark> l = new ArrayList<>(); + int i = 0; + for (Row row : this) + { + for (Cell cell : row) + if (cell.isCounterCell()) + l.add(new CounterMark(clustering, i, cell.column(), cell.path())); + i++; + } + return l; + } + + /** + * Returns a row writer for the static row of this partition update. + * + * @return a row writer for the static row of this partition update. A partition + * update contains only one static row so only one row should be written through + * this writer (but if multiple rows are added, the latest written one wins). + */ + public Row.Writer staticWriter() + { + return createStaticWriter(); + } + + /** + * Returns a row writer to add (non-static) rows to this partition update. + * + * @return a row writer to add (non-static) rows to this partition update. + * Multiple rows can be successively added this way and the rows added do not have + * to be in clustering order. Further, the same row can be added multiple time. + * + */ + public Row.Writer writer() + { + if (isSorted) + throw new IllegalStateException("An update should not written again once it has been read"); + + return writer; + } + + /** + * Returns a range tombstone marker writer to add range tombstones to this + * partition update. + * <p> + * Note that if more convenient, range tombstones can also be added using + * {@link addRangeTombstone}. + * + * @param isReverseOrder whether the range tombstone marker will be provided to the returned writer + * in clustering order or in reverse clustering order. + * @return a range tombstone marker writer to add range tombstones to this update. + */ + public RangeTombstoneMarker.Writer markerWriter(boolean isReverseOrder) + { + return new RangeTombstoneCollector(isReverseOrder); + } + + /** + * The number of rows contained in this update. + * + * @return the number of rows contained in this update. + */ + public int size() + { + return rows; + } + + private void maybeSort() + { + if (isSorted) + return; + + sort(); + } + + private synchronized void sort() + { + if (isSorted) + return; + + if (rows <= 1) + { + isSorted = true; + return; + } + + // Sort the rows - will still potentially contain duplicate (non-reconciled) rows + Sorting.sort(this); + + // Now find duplicates and merge them together + int previous = 0; // The last element that was set + int nowInSec = FBUtilities.nowInSeconds(); + for (int current = 1; current < rows; current++) + { + // There is really only 2 possible comparison: < 0 or == 0 since we've sorted already + int cmp = compare(previous, current); + if (cmp == 0) + { + // current and previous are the same row. Merge current into previous + // (and so previous + 1 will be "free"). + data.merge(current, previous, nowInSec); + } + else + { + // data[current] != [previous], so move current just after previous if needs be + ++previous; + if (previous != current) + data.move(current, previous); + } + } + + // previous is on the last value to keep + rows = previous + 1; + + isSorted = true; + } + + /** + * This method is note meant to be used externally: it is only public so this + * update conform to the {@link Sorting.Sortable} interface. + */ + public int compare(int i, int j) + { + return metadata.comparator.compare(p1.setTo(i), p2.setTo(j)); + } + + protected class StaticWriter extends StaticRow.Builder + { + protected StaticWriter() + { + super(columns.statics, false, metadata().isCounter()); + } + + @Override + public void endOfRow() + { + super.endOfRow(); + if (staticRow == null) + { + staticRow = build(); + } + else + { + StaticRow.Builder builder = StaticRow.builder(columns.statics, true, metadata().isCounter()); + Rows.merge(staticRow, build(), columns.statics, builder, FBUtilities.nowInSeconds()); + staticRow = builder.build(); + } + } + } + + protected class RegularWriter extends Writer + { + // For complex column, the writer assumptions is that for a given row, cells of different + // complex columns are not intermingled (they also should be in cellPath order). We however + // don't yet guarantee that this will be the case for updates (both UpdateStatement and + // RowUpdateBuilder can potentially break that assumption; we could change those classes but + // that's non trivial, at least for UpdateStatement). + // To deal with that problem, we record which complex columns have been updated (for the current + // row) and if we detect a violation of our assumption, we switch the row we're writing + // into (which is ok because everything will be sorted and merged in maybeSort()). + private final Set<ColumnDefinition> updatedComplex = new HashSet(); + private ColumnDefinition lastUpdatedComplex; + private CellPath lastUpdatedComplexPath; + + public RegularWriter() + { + super(false); + } + + @Override + public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) + { + if (column.isComplex()) + { + if (updatedComplex.contains(column) + && (!column.equals(lastUpdatedComplex) || (column.cellPathComparator().compare(path, lastUpdatedComplexPath)) <= 0)) + { + // We've updated that complex already, but we've either updated another complex or it's not in order: as this + // break the writer assumption, switch rows. + endOfRow(); + + // Copy the clustering values from the previous row + int clusteringSize = metadata.clusteringColumns().size(); + int base = (row - 1) * clusteringSize; + for (int i = 0; i < clusteringSize; i++) + writer.writeClusteringValue(clusterings[base + i]); + + updatedComplex.clear(); + } + + lastUpdatedComplex = column; + lastUpdatedComplexPath = path; + updatedComplex.add(column); + } + super.writeCell(column, isCounter, value, info, path); + } + + @Override + public void endOfRow() + { + super.endOfRow(); + clear(); + } + + @Override + public Writer reset() + { + super.reset(); + clear(); + return this; + } + + private void clear() + { + updatedComplex.clear(); + lastUpdatedComplex = null; + lastUpdatedComplexPath = null; + } + } + + public static class PartitionUpdateSerializer + { + public void serialize(PartitionUpdate update, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + { + // TODO + throw new UnsupportedOperationException(); + + // if (cf == null) + // { + // out.writeBoolean(false); + // return; + // } + + // out.writeBoolean(true); + // serializeCfId(cf.id(), out, version); + // cf.getComparator().deletionInfoSerializer().serialize(cf.deletionInfo(), out, version); + // ColumnSerializer columnSerializer = cf.getComparator().columnSerializer(); + // int count = cf.getColumnCount(); + // out.writeInt(count); + // int written = 0; + // for (Cell cell : cf) + // { + // columnSerializer.serialize(cell, out); + // written++; + // } + // assert count == written: "Table had " + count + " columns, but " + written + " written"; + } + + try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) + { + assert !iter.isReverseOrder(); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, version, update.rows); + } + } + + public PartitionUpdate deserialize(DataInput in, int version, SerializationHelper.Flag flag, DecoratedKey key) throws IOException + { + if (version < MessagingService.VERSION_30) + { + assert key != null; + + // This is only used in mutation, and mutation have never allowed "null" column families + boolean present = in.readBoolean(); + assert present; + + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version); + int size = in.readInt(); + Iterator<LegacyLayout.LegacyCell> cells = LegacyLayout.deserializeCells(metadata, in, flag, size); + SerializationHelper helper = new SerializationHelper(version, flag); + try (UnfilteredRowIterator iterator = LegacyLayout.onWireCellstoUnfilteredRowIterator(metadata, key, info, cells, false, helper)) + { + return UnfilteredRowIterators.toUpdate(iterator); + } + } + + assert key == null; // key is only there for the old format + + UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, flag); + if (h.isEmpty) + return emptyUpdate(h.metadata, h.key); + + assert !h.isReversed; + assert h.rowEstimate >= 0; + PartitionUpdate upd = new PartitionUpdate(h.metadata, + h.key, + new DeletionInfo(h.partitionDeletion), + new RowDataBlock(h.sHeader.columns().regulars, h.rowEstimate, false, h.metadata.isCounter()), + h.sHeader.columns(), + h.rowEstimate); + + upd.staticRow = h.staticRow; + + RangeTombstoneMarker.Writer markerWriter = upd.markerWriter(false); + UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(version, flag), h.sHeader, upd.writer(), markerWriter); + + // Mark sorted after we're read it all since that's what we use in the writer() method to detect bad uses + upd.isSorted = true; + + return upd; + } + + public long serializedSize(PartitionUpdate update, int version, TypeSizes sizes) + { + if (version < MessagingService.VERSION_30) + { + // TODO + throw new UnsupportedOperationException("Version is " + version); + //if (cf == null) + //{ + // return typeSizes.sizeof(false); + //} + //else + //{ + // return typeSizes.sizeof(true) /* nullness bool */ + // + cfIdSerializedSize(cf.id(), typeSizes, version) /* id */ + // + contentSerializedSize(cf, typeSizes, version); + //} + } + + try (UnfilteredRowIterator iter = update.sliceableUnfilteredIterator()) + { + return UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, version, update.rows, sizes); + } + } + } + + /** + * A counter mark is basically a pointer to a counter update inside this partition update. That pointer allows + * us to update the counter value based on the pre-existing value read during the read-before-write that counters + * do. See {@link CounterMutation} to understand how this is used. + */ + public class CounterMark + { + private final InternalReusableClustering clustering; + private final int row; + private final ColumnDefinition column; + private final CellPath path; + + private CounterMark(InternalReusableClustering clustering, int row, ColumnDefinition column, CellPath path) + { + this.clustering = clustering; + this.row = row; + this.column = column; + this.path = path; + } + + public Clustering clustering() + { + return clustering.setTo(row); + } + + public ColumnDefinition column() + { + return column; + } + + public CellPath path() + { + return path; + } + + public ByteBuffer value() + { + return data.getValue(row, column, path); + } + + public void setValue(ByteBuffer value) + { + data.setValue(row, column, path, value); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java new file mode 100644 index 0000000..e2fec05 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/SingletonUnfilteredPartitionIterator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.NoSuchElementException; + +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +public class SingletonUnfilteredPartitionIterator implements UnfilteredPartitionIterator +{ + private final UnfilteredRowIterator iter; + private final boolean isForThrift; + private boolean returned; + + public SingletonUnfilteredPartitionIterator(UnfilteredRowIterator iter, boolean isForThrift) + { + this.iter = iter; + this.isForThrift = isForThrift; + } + + public boolean isForThrift() + { + return isForThrift; + } + + public boolean hasNext() + { + return !returned; + } + + public UnfilteredRowIterator next() + { + if (returned) + throw new NoSuchElementException(); + + returned = true; + return iter; + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + iter.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java new file mode 100644 index 0000000..10022eb --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/TombstonePurgingPartitionIterator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; + +public abstract class TombstonePurgingPartitionIterator extends FilteringPartitionIterator +{ + private final int gcBefore; + + public TombstonePurgingPartitionIterator(UnfilteredPartitionIterator iterator, int gcBefore) + { + super(iterator); + this.gcBefore = gcBefore; + } + + protected abstract long getMaxPurgeableTimestamp(); + + protected FilteringRow makeRowFilter() + { + return new FilteringRow() + { + @Override + protected boolean include(LivenessInfo info) + { + return !info.hasLocalDeletionTime() || !info.isPurgeable(getMaxPurgeableTimestamp(), gcBefore); + } + + @Override + protected boolean include(DeletionTime dt) + { + return includeDelTime(dt); + } + + @Override + protected boolean include(ColumnDefinition c, DeletionTime dt) + { + return includeDelTime(dt); + } + }; + } + + private boolean includeDelTime(DeletionTime dt) + { + return dt.isLive() || !dt.isPurgeable(getMaxPurgeableTimestamp(), gcBefore); + } + + @Override + protected boolean includePartitionDeletion(DeletionTime dt) + { + return includeDelTime(dt); + } + + @Override + protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) + { + if (marker.isBoundary()) + { + // We can only skip the whole marker if both deletion time are purgeable. + // If only one of them is, filterTombstoneMarker will deal with it. + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; + return includeDelTime(boundary.endDeletionTime()) || includeDelTime(boundary.startDeletionTime()); + } + else + { + return includeDelTime(((RangeTombstoneBoundMarker)marker).deletionTime()); + } + } + + @Override + protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) + { + if (!marker.isBoundary()) + return marker; + + // Note that we know this is called after includeRangeTombstoneMarker. So if one of the deletion time is + // purgeable, we know the other one isn't. + RangeTombstoneBoundaryMarker boundary = (RangeTombstoneBoundaryMarker)marker; + if (!(includeDelTime(boundary.closeDeletionTime(reversed)))) + return boundary.createCorrespondingCloseBound(reversed); + else if (!(includeDelTime(boundary.openDeletionTime(reversed)))) + return boundary.createCorrespondingOpenBound(reversed); + return boundary; + } + +}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java new file mode 100644 index 0000000..2447da8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.Iterator; + +import org.apache.cassandra.db.rows.UnfilteredRowIterator; + +/** + * An iterator over a number of unfiltered partitions (i.e. partitions containing deletion informations). + * + * The object returned by a call to next() is only guaranteed to be + * valid until the next call to hasNext() or next(). If a consumer wants to keep a + * reference on the returned objects for longer than the iteration, it must + * make a copy of it explicitely. + */ +public interface UnfilteredPartitionIterator extends Iterator<UnfilteredRowIterator>, AutoCloseable +{ + /** + * Whether that partition iterator is for a thrift queries. + * <p> + * If this is true, the partition iterator may return some empty UnfilteredRowIterator and those + * should be preserved as thrift include partitions that "exists" (have some cells even + * if this are actually deleted) but have nothing matching the query. + * + * @return whether the iterator is for a thrift query. + */ + public boolean isForThrift(); + + public void close(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java new file mode 100644 index 0000000..f66ec11 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.io.DataInput; +import java.io.IOError; +import java.io.IOException; +import java.security.MessageDigest; +import java.util.*; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.MergeIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Static methods to work with partition iterators. + */ +public abstract class UnfilteredPartitionIterators +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredPartitionIterators.class); + + private static final Serializer serializer = new Serializer(); + + private static final Comparator<UnfilteredRowIterator> partitionComparator = new Comparator<UnfilteredRowIterator>() + { + public int compare(UnfilteredRowIterator p1, UnfilteredRowIterator p2) + { + return p1.partitionKey().compareTo(p2.partitionKey()); + } + }; + + public static final UnfilteredPartitionIterator EMPTY = new AbstractUnfilteredPartitionIterator() + { + public boolean isForThrift() + { + return false; + } + + public boolean hasNext() + { + return false; + } + + public UnfilteredRowIterator next() + { + throw new NoSuchElementException(); + } + }; + + private UnfilteredPartitionIterators() {} + + public interface MergeListener + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions); + public void close(); + } + + @SuppressWarnings("resource") // The created resources are returned right away + public static UnfilteredRowIterator getOnlyElement(final UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command) + { + // If the query has no results, we'll get an empty iterator, but we still + // want a RowIterator out of this method, so we return an empty one. + UnfilteredRowIterator toReturn = iter.hasNext() + ? iter.next() + : UnfilteredRowIterators.emptyIterator(command.metadata(), + command.partitionKey(), + command.clusteringIndexFilter().isReversed()); + + // Note that in general, we should wrap the result so that it's close method actually + // close the whole UnfilteredPartitionIterator. + return new WrappingUnfilteredRowIterator(toReturn) + { + public void close() + { + try + { + super.close(); + } + finally + { + // asserting this only now because it bothers Serializer if hasNext() is called before + // the previously returned iterator hasn't been fully consumed. + assert !iter.hasNext(); + + iter.close(); + } + } + }; + } + + public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener) + { + // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators + return filter(merge(iterators, nowInSec, listener), nowInSec); + } + + public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) + { + return new PartitionIterator() + { + private RowIterator next; + + public boolean hasNext() + { + while (next == null && iterator.hasNext()) + { + @SuppressWarnings("resource") // closed either directly if empty, or, if assigned to next, by either + // the caller of next() or close() + UnfilteredRowIterator rowIterator = iterator.next(); + next = UnfilteredRowIterators.filter(rowIterator, nowInSec); + if (!iterator.isForThrift() && next.isEmpty()) + { + rowIterator.close(); + next = null; + } + } + return next != null; + } + + public RowIterator next() + { + if (next == null && !hasNext()) + throw new NoSuchElementException(); + + RowIterator toReturn = next; + next = null; + return toReturn; + } + + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + try + { + iterator.close(); + } + finally + { + if (next != null) + next.close(); + } + } + }; + } + + public static UnfilteredPartitionIterator merge(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener listener) + { + assert listener != null; + assert !iterators.isEmpty(); + + final boolean isForThrift = iterators.get(0).isForThrift(); + + final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>() + { + private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size()); + + private CFMetaData metadata; + private DecoratedKey partitionKey; + private boolean isReverseOrder; + + public void reduce(int idx, UnfilteredRowIterator current) + { + metadata = current.metadata(); + partitionKey = current.partitionKey(); + isReverseOrder = current.isReverseOrder(); + + // Note that because the MergeListener cares about it, we want to preserve the index of the iterator. + // Non-present iterator will thus be set to empty in getReduced. + toMerge.set(idx, current); + } + + protected UnfilteredRowIterator getReduced() + { + UnfilteredRowIterators.MergeListener rowListener = listener.getRowMergeListener(partitionKey, toMerge); + + // Replace nulls by empty iterators + for (int i = 0; i < toMerge.size(); i++) + if (toMerge.get(i) == null) + toMerge.set(i, UnfilteredRowIterators.emptyIterator(metadata, partitionKey, isReverseOrder)); + + return UnfilteredRowIterators.merge(toMerge, nowInSec, rowListener); + } + + protected void onKeyChange() + { + toMerge.clear(); + for (int i = 0; i < iterators.size(); i++) + toMerge.add(null); + } + }); + + return new AbstractUnfilteredPartitionIterator() + { + public boolean isForThrift() + { + return isForThrift; + } + + public boolean hasNext() + { + return merged.hasNext(); + } + + public UnfilteredRowIterator next() + { + return merged.next(); + } + + @Override + public void close() + { + merged.close(); + } + }; + } + + /** + * Convert all expired cells to equivalent tombstones. + * <p> + * See {@link UnfilteredRowIterators#convertExpiredCellsToTombstones} for details. + * + * @param iterator the iterator in which to conver expired cells. + * @param nowInSec the current time to use to decide if a cell is expired. + * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted + * to equivalent tombstones. + */ + public static UnfilteredPartitionIterator convertExpiredCellsToTombstones(UnfilteredPartitionIterator iterator, final int nowInSec) + { + return new WrappingUnfilteredPartitionIterator(iterator) + { + @Override + protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + return UnfilteredRowIterators.convertExpiredCellsToTombstones(iter, nowInSec); + } + }; + } + + public static UnfilteredPartitionIterator mergeLazily(final List<? extends UnfilteredPartitionIterator> iterators, final int nowInSec) + { + assert !iterators.isEmpty(); + + if (iterators.size() == 1) + return iterators.get(0); + + final boolean isForThrift = iterators.get(0).isForThrift(); + + final MergeIterator<UnfilteredRowIterator, UnfilteredRowIterator> merged = MergeIterator.get(iterators, partitionComparator, new MergeIterator.Reducer<UnfilteredRowIterator, UnfilteredRowIterator>() + { + private final List<UnfilteredRowIterator> toMerge = new ArrayList<>(iterators.size()); + + @Override + public boolean trivialReduceIsTrivial() + { + return false; + } + + public void reduce(int idx, UnfilteredRowIterator current) + { + toMerge.add(current); + } + + protected UnfilteredRowIterator getReduced() + { + return new LazilyInitializedUnfilteredRowIterator(toMerge.get(0).partitionKey()) + { + protected UnfilteredRowIterator initializeIterator() + { + return UnfilteredRowIterators.merge(toMerge, nowInSec); + } + }; + } + + protected void onKeyChange() + { + toMerge.clear(); + } + }); + + return new AbstractUnfilteredPartitionIterator() + { + public boolean isForThrift() + { + return isForThrift; + } + + public boolean hasNext() + { + return merged.hasNext(); + } + + public UnfilteredRowIterator next() + { + return merged.next(); + } + + @Override + public void close() + { + merged.close(); + } + }; + } + + public static UnfilteredPartitionIterator removeDroppedColumns(UnfilteredPartitionIterator iterator, final Map<ColumnIdentifier, CFMetaData.DroppedColumn> droppedColumns) + { + return new FilteringPartitionIterator(iterator) + { + @Override + protected FilteringRow makeRowFilter() + { + return new FilteringRow() + { + @Override + protected boolean include(Cell cell) + { + return include(cell.column(), cell.livenessInfo().timestamp()); + } + + @Override + protected boolean include(ColumnDefinition c, DeletionTime dt) + { + return include(c, dt.markedForDeleteAt()); + } + + private boolean include(ColumnDefinition column, long timestamp) + { + CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name); + return dropped == null || timestamp > dropped.droppedTime; + } + }; + } + + @Override + protected boolean shouldFilter(UnfilteredRowIterator iterator) + { + // TODO: We could have row iterators return the smallest timestamp they might return + // (which we can get from sstable stats), and ignore any dropping if that smallest + // timestamp is bigger that the biggest droppedColumns timestamp. + + // If none of the dropped columns is part of the columns that the iterator actually returns, there is nothing to do; + for (ColumnDefinition c : iterator.columns()) + if (droppedColumns.containsKey(c.name)) + return true; + + return false; + } + }; + } + + public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest) + { + try (UnfilteredPartitionIterator iter = iterator) + { + while (iter.hasNext()) + { + try (UnfilteredRowIterator partition = iter.next()) + { + UnfilteredRowIterators.digest(partition, digest); + } + } + } + } + + public static Serializer serializerForIntraNode() + { + return serializer; + } + + /** + * Wraps the provided iterator so it logs the returned rows/RT for debugging purposes. + * <p> + * Note that this is only meant for debugging as this can log a very large amount of + * logging at INFO. + */ + public static UnfilteredPartitionIterator loggingIterator(UnfilteredPartitionIterator iterator, final String id, final boolean fullDetails) + { + return new WrappingUnfilteredPartitionIterator(iterator) + { + public UnfilteredRowIterator next() + { + return UnfilteredRowIterators.loggingIterator(super.next(), id, fullDetails); + } + }; + } + + /** + * Serialize each UnfilteredSerializer one after the other, with an initial byte that indicates whether + * we're done or not. + */ + public static class Serializer + { + public void serialize(UnfilteredPartitionIterator iter, DataOutputPlus out, int version) throws IOException + { + if (version < MessagingService.VERSION_30) + throw new UnsupportedOperationException(); + + out.writeBoolean(iter.isForThrift()); + while (iter.hasNext()) + { + out.writeBoolean(true); + try (UnfilteredRowIterator partition = iter.next()) + { + UnfilteredRowIteratorSerializer.serializer.serialize(partition, out, version); + } + } + out.writeBoolean(false); + } + + public UnfilteredPartitionIterator deserialize(final DataInput in, final int version, final SerializationHelper.Flag flag) throws IOException + { + if (version < MessagingService.VERSION_30) + throw new UnsupportedOperationException(); + + final boolean isForThrift = in.readBoolean(); + + return new AbstractUnfilteredPartitionIterator() + { + private UnfilteredRowIterator next; + private boolean hasNext; + private boolean nextReturned = true; + + public boolean isForThrift() + { + return isForThrift; + } + + public boolean hasNext() + { + if (!nextReturned) + return hasNext; + + // We can't answer this until the previously returned iterator has been fully consumed, + // so complain if that's not the case. + if (next != null && next.hasNext()) + throw new IllegalStateException("Cannot call hasNext() until the previous iterator has been fully consumed"); + + try + { + hasNext = in.readBoolean(); + nextReturned = false; + return hasNext; + } + catch (IOException e) + { + throw new IOError(e); + } + } + + public UnfilteredRowIterator next() + { + if (nextReturned && !hasNext()) + throw new NoSuchElementException(); + + try + { + nextReturned = true; + next = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, flag); + return next; + } + catch (IOException e) + { + throw new IOError(e); + } + } + + @Override + public void close() + { + if (next != null) + next.close(); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java new file mode 100644 index 0000000..4d4be70 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.RowIterator; + +public abstract class WrappingPartitionIterator implements PartitionIterator +{ + protected final PartitionIterator wrapped; + + protected WrappingPartitionIterator(PartitionIterator wrapped) + { + this.wrapped = wrapped; + } + + public boolean hasNext() + { + return wrapped.hasNext(); + } + + public RowIterator next() + { + return wrapped.next(); + } + + public void remove() + { + wrapped.remove(); + } + + public void close() + { + wrapped.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java new file mode 100644 index 0000000..4f35075 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; + +/** + * A utility class for writing partition iterators that filter/modify other + * partition iterators. + * + * This work a little bit like Guava's AbstractIterator in that you only need + * to implement the computeNext() method, though that method takes as argument + * the UnfilteredRowIterator to filter from the wrapped partition iterator. + */ +public abstract class WrappingUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator +{ + protected final UnfilteredPartitionIterator wrapped; + + private UnfilteredRowIterator next; + + protected WrappingUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped) + { + this.wrapped = wrapped; + } + + public boolean isForThrift() + { + return wrapped.isForThrift(); + } + + public boolean hasNext() + { + prepareNext(); + return next != null; + } + + public UnfilteredRowIterator next() + { + prepareNext(); + assert next != null; + + UnfilteredRowIterator toReturn = next; + next = null; + return toReturn; + } + + private void prepareNext() + { + while (next == null && wrapped.hasNext()) + { + @SuppressWarnings("resource") // Closed on exception, right away if empty or ignored by computeNext, or if assigned to 'next', + // either by the caller to next(), or in close(). + UnfilteredRowIterator wrappedNext = wrapped.next(); + try + { + UnfilteredRowIterator maybeNext = computeNext(wrappedNext); + + // As the wrappd iterator shouldn't return an empty iterator, if computeNext + // gave us back it's input we save the isEmpty check. + if (maybeNext != null && (isForThrift() || maybeNext == wrappedNext || !maybeNext.isEmpty())) + { + next = maybeNext; + return; + } + else + { + wrappedNext.close(); + } + } + catch (RuntimeException | Error e) + { + wrappedNext.close(); + throw e; + } + } + } + + /** + * Given the next UnfilteredRowIterator from the wrapped partition iterator, return + * the (potentially modified) UnfilteredRowIterator to return. Please note that the + * result will be skipped if it's either {@code null} of if it's empty. + * + * The default implementation return it's input unchanged to make it easier + * to write wrapping partition iterators that only change the close method. + */ + protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + return iter; + } + + @Override + public void close() + { + try + { + wrapped.close(); + } + finally + { + if (next != null) + next.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java new file mode 100644 index 0000000..c003d6f --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.security.MessageDigest; +import java.util.Objects; + +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Base abstract class for {@code Cell} implementations. + * + * Unless you have a very good reason not to, every cell implementation + * should probably extend this class. + */ +public abstract class AbstractCell implements Cell +{ + public boolean isLive(int nowInSec) + { + return livenessInfo().isLive(nowInSec); + } + + public boolean isTombstone() + { + return livenessInfo().hasLocalDeletionTime() && !livenessInfo().hasTTL(); + } + + public boolean isExpiring() + { + return livenessInfo().hasTTL(); + } + + public void writeTo(Row.Writer writer) + { + writer.writeCell(column(), isCounterCell(), value(), livenessInfo(), path()); + } + + public void digest(MessageDigest digest) + { + digest.update(value().duplicate()); + livenessInfo().digest(digest); + FBUtilities.updateWithBoolean(digest, isCounterCell()); + if (path() != null) + path().digest(digest); + } + + public void validate() + { + column().validateCellValue(value()); + + livenessInfo().validate(); + + // If cell is a tombstone, it shouldn't have a value. + if (isTombstone() && value().hasRemaining()) + throw new MarshalException("A tombstone should not have a value"); + + if (path() != null) + column().validateCellPath(path()); + } + + public int dataSize() + { + int size = value().remaining() + livenessInfo().dataSize(); + if (path() != null) + size += path().dataSize(); + return size; + + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof Cell)) + return false; + + Cell that = (Cell)other; + return this.column().equals(that.column()) + && this.isCounterCell() == that.isCounterCell() + && Objects.equals(this.value(), that.value()) + && Objects.equals(this.livenessInfo(), that.livenessInfo()) + && Objects.equals(this.path(), that.path()); + } + + @Override + public int hashCode() + { + return Objects.hash(column(), isCounterCell(), value(), livenessInfo(), path()); + } + + @Override + public String toString() + { + if (isCounterCell()) + return String.format("[%s=%d ts=%d]", column().name, CounterContext.instance().total(value()), livenessInfo().timestamp()); + + AbstractType<?> type = column().type; + if (type instanceof CollectionType && type.isMultiCell()) + { + CollectionType ct = (CollectionType)type; + return String.format("[%s[%s]=%s info=%s]", + column().name, + ct.nameComparator().getString(path().get(0)), + ct.valueComparator().getString(value()), + livenessInfo()); + } + return String.format("[%s=%s info=%s]", column().name, type.getString(value()), livenessInfo()); + } + + public Cell takeAlias() + { + // Cell is always used as an Aliasable object but as the code currently + // never need to alias a cell outside of its valid scope, we don't yet + // need that. + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java new file mode 100644 index 0000000..d8256fc --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AbstractRangeTombstoneMarker.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +public abstract class AbstractRangeTombstoneMarker implements RangeTombstoneMarker +{ + protected final RangeTombstone.Bound bound; + + protected AbstractRangeTombstoneMarker(RangeTombstone.Bound bound) + { + this.bound = bound; + } + + public RangeTombstone.Bound clustering() + { + return bound; + } + + public Unfiltered.Kind kind() + { + return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; + } + + public void validateData(CFMetaData metadata) + { + Slice.Bound bound = clustering(); + for (int i = 0; i < bound.size(); i++) + { + ByteBuffer value = bound.get(i); + if (value != null) + metadata.comparator.subtype(i).validate(value); + } + } + + public String toString(CFMetaData metadata, boolean fullDetails) + { + return toString(metadata); + } + + protected void copyBoundTo(RangeTombstoneMarker.Writer writer) + { + for (int i = 0; i < bound.size(); i++) + writer.writeClusteringValue(bound.get(i)); + writer.writeBoundKind(bound.kind()); + } + + public Unfiltered takeAlias() + { + return this; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java new file mode 100644 index 0000000..03aeb88 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AbstractReusableRow.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.SearchIterator; + +public abstract class AbstractReusableRow extends AbstractRow +{ + private CellData.ReusableCell simpleCell; + private ComplexRowDataBlock.ReusableIterator complexCells; + private DeletionTimeArray.Cursor complexDeletionCursor; + private RowDataBlock.ReusableIterator iterator; + + public AbstractReusableRow() + { + } + + protected abstract int row(); + protected abstract RowDataBlock data(); + + private CellData.ReusableCell simpleCell() + { + if (simpleCell == null) + simpleCell = SimpleRowDataBlock.reusableCell(); + return simpleCell; + } + + private ComplexRowDataBlock.ReusableIterator complexCells() + { + if (complexCells == null) + complexCells = ComplexRowDataBlock.reusableComplexCells(); + return complexCells; + } + + private DeletionTimeArray.Cursor complexDeletionCursor() + { + if (complexDeletionCursor == null) + complexDeletionCursor = ComplexRowDataBlock.complexDeletionCursor(); + return complexDeletionCursor; + } + + private RowDataBlock.ReusableIterator reusableIterator() + { + if (iterator == null) + iterator = RowDataBlock.reusableIterator(); + return iterator; + } + + public Columns columns() + { + return data().columns(); + } + + public Cell getCell(ColumnDefinition c) + { + assert !c.isComplex(); + if (data().simpleData == null) + return null; + + int idx = columns().simpleIdx(c, 0); + if (idx < 0) + return null; + + return simpleCell().setTo(data().simpleData.data, c, (row() * columns().simpleColumnCount()) + idx); + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + assert c.isComplex(); + + ComplexRowDataBlock data = data().complexData; + if (data == null) + return null; + + int idx = data.cellIdx(row(), c, path); + if (idx < 0) + return null; + + return simpleCell().setTo(data.cellData(row()), c, idx); + } + + public Iterator<Cell> getCells(ColumnDefinition c) + { + assert c.isComplex(); + return complexCells().setTo(data().complexData, row(), c); + } + + public boolean hasComplexDeletion() + { + return data().hasComplexDeletion(row()); + } + + public DeletionTime getDeletion(ColumnDefinition c) + { + assert c.isComplex(); + if (data().complexData == null) + return DeletionTime.LIVE; + + int idx = data().complexData.complexDeletionIdx(row(), c); + return idx < 0 + ? DeletionTime.LIVE + : complexDeletionCursor().setTo(data().complexData.complexDelTimes, idx); + } + + public Iterator<Cell> iterator() + { + return reusableIterator().setTo(data(), row()); + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return new SearchIterator<ColumnDefinition, ColumnData>() + { + private int simpleIdx = 0; + + public boolean hasNext() + { + // TODO: we can do better, but we expect users to no rely on this anyway + return true; + } + + public ColumnData next(ColumnDefinition column) + { + if (column.isComplex()) + { + // TODO: this is sub-optimal + + Iterator<Cell> cells = getCells(column); + return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column)); + } + else + { + int idx = columns().simpleIdx(column, simpleIdx); + if (idx < 0) + return null; + + Cell cell = simpleCell().setTo(data().simpleData.data, column, (row() * columns().simpleColumnCount()) + idx); + simpleIdx = idx + 1; + return cell == null ? null : new ColumnData(column, cell, null, null); + } + } + }; + } + + public Row takeAlias() + { + final Clustering clustering = clustering().takeAlias(); + final LivenessInfo info = primaryKeyLivenessInfo().takeAlias(); + final DeletionTime deletion = deletion().takeAlias(); + + final RowDataBlock data = data(); + final int row = row(); + + return new AbstractReusableRow() + { + protected RowDataBlock data() + { + return data; + } + + protected int row() + { + return row; + } + + public Clustering clustering() + { + return clustering; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return info; + } + + public DeletionTime deletion() + { + return deletion; + } + + @Override + public Row takeAlias() + { + return this; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/AbstractRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java b/src/java/org/apache/cassandra/db/rows/AbstractRow.java new file mode 100644 index 0000000..a99bc78 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.util.Iterator; +import java.util.Objects; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Base abstract class for {@code Row} implementations. + * + * Unless you have a very good reason not to, every row implementation + * should probably extend this class. + */ +public abstract class AbstractRow implements Row +{ + public Unfiltered.Kind kind() + { + return Unfiltered.Kind.ROW; + } + + public boolean hasLiveData(int nowInSec) + { + if (primaryKeyLivenessInfo().isLive(nowInSec)) + return true; + + for (Cell cell : this) + if (cell.isLive(nowInSec)) + return true; + + return false; + } + + public boolean isEmpty() + { + return !primaryKeyLivenessInfo().hasTimestamp() + && deletion().isLive() + && !iterator().hasNext() + && !hasComplexDeletion(); + } + + public boolean isStatic() + { + return clustering() == Clustering.STATIC_CLUSTERING; + } + + public void digest(MessageDigest digest) + { + FBUtilities.updateWithByte(digest, kind().ordinal()); + clustering().digest(digest); + + deletion().digest(digest); + primaryKeyLivenessInfo().digest(digest); + + Iterator<ColumnDefinition> iter = columns().complexColumns(); + while (iter.hasNext()) + getDeletion(iter.next()).digest(digest); + + for (Cell cell : this) + cell.digest(digest); + } + + /** + * Copy this row to the provided writer. + * + * @param writer the row writer to write this row to. + */ + public void copyTo(Row.Writer writer) + { + Rows.writeClustering(clustering(), writer); + writer.writePartitionKeyLivenessInfo(primaryKeyLivenessInfo()); + writer.writeRowDeletion(deletion()); + + for (Cell cell : this) + cell.writeTo(writer); + + for (int i = 0; i < columns().complexColumnCount(); i++) + { + ColumnDefinition c = columns().getComplex(i); + DeletionTime dt = getDeletion(c); + if (!dt.isLive()) + writer.writeComplexDeletion(c, dt); + } + writer.endOfRow(); + } + + public void validateData(CFMetaData metadata) + { + Clustering clustering = clustering(); + for (int i = 0; i < clustering.size(); i++) + { + ByteBuffer value = clustering.get(i); + if (value != null) + metadata.comparator.subtype(i).validate(value); + } + + primaryKeyLivenessInfo().validate(); + if (deletion().localDeletionTime() < 0) + throw new MarshalException("A local deletion time should not be negative"); + + for (Cell cell : this) + cell.validate(); + } + + public String toString(CFMetaData metadata) + { + return toString(metadata, false); + } + + public String toString(CFMetaData metadata, boolean fullDetails) + { + StringBuilder sb = new StringBuilder(); + sb.append("Row"); + if (fullDetails) + { + sb.append("[info=").append(primaryKeyLivenessInfo()); + if (!deletion().isLive()) + sb.append(" del=").append(deletion()); + sb.append(" ]"); + } + sb.append(": ").append(clustering().toString(metadata)).append(" | "); + boolean isFirst = true; + ColumnDefinition prevColumn = null; + for (Cell cell : this) + { + if (isFirst) isFirst = false; else sb.append(", "); + if (fullDetails) + { + if (cell.column().isComplex() && !cell.column().equals(prevColumn)) + { + DeletionTime complexDel = getDeletion(cell.column()); + if (!complexDel.isLive()) + sb.append("del(").append(cell.column().name).append(")=").append(complexDel).append(", "); + } + sb.append(cell); + prevColumn = cell.column(); + } + else + { + sb.append(cell.column().name); + if (cell.column().type instanceof CollectionType) + { + CollectionType ct = (CollectionType)cell.column().type; + sb.append("[").append(ct.nameComparator().getString(cell.path().get(0))).append("]"); + sb.append("=").append(ct.valueComparator().getString(cell.value())); + } + else + { + sb.append("=").append(cell.column().type.getString(cell.value())); + } + } + } + return sb.toString(); + } + + @Override + public boolean equals(Object other) + { + if(!(other instanceof Row)) + return false; + + Row that = (Row)other; + if (!this.clustering().equals(that.clustering()) + || !this.columns().equals(that.columns()) + || !this.primaryKeyLivenessInfo().equals(that.primaryKeyLivenessInfo()) + || !this.deletion().equals(that.deletion())) + return false; + + Iterator<Cell> thisCells = this.iterator(); + Iterator<Cell> thatCells = that.iterator(); + while (thisCells.hasNext()) + { + if (!thatCells.hasNext() || !thisCells.next().equals(thatCells.next())) + return false; + } + return !thatCells.hasNext(); + } + + @Override + public int hashCode() + { + int hash = Objects.hash(clustering(), columns(), primaryKeyLivenessInfo(), deletion()); + for (Cell cell : this) + hash += 31 * cell.hashCode(); + return hash; + } +}
