Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aeca1d2b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aeca1d2b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aeca1d2b Branch: refs/heads/trunk Commit: aeca1d2bd8e395a2897c3e36224f49b586babd4e Parents: 31dec3d 5ef8a8b Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Mar 10 10:01:01 2017 +0100 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Mar 10 10:02:21 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/UpdateParameters.java | 24 ++++- .../org/apache/cassandra/db/rows/BTreeRow.java | 43 ++++++-- src/java/org/apache/cassandra/db/rows/Row.java | 6 ++ .../org/apache/cassandra/utils/btree/BTree.java | 19 ++++ .../validation/entities/CollectionsTest.java | 100 +++++++++++++++++++ .../apache/cassandra/db/rows/RowBuilder.java | 7 ++ 7 files changed, 191 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 1876922,09e4039..52a794b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,20 -1,6 +1,21 @@@ -2.2.10 +3.0.13 + * Slice.isEmpty() returns false for some empty slices (CASSANDRA-13305) + * Add formatted row output to assertEmpty in CQL Tester (CASSANDRA-13238) +Merged from 2.2: + * Fix queries updating multiple time the same list (CASSANDRA-13130) * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053) + + +3.0.12 + * Prevent data loss on upgrade 2.1 - 3.0 by adding component separator to LogRecord absolute path (CASSANDRA-13294) + * Improve testing on macOS by eliminating sigar logging (CASSANDRA-13233) + * Cqlsh copy-from should error out when csv contains invalid data for collections (CASSANDRA-13071) + * Update c.yaml doc for offheap memtables (CASSANDRA-13179) + * Faster StreamingHistogram (CASSANDRA-13038) + * Legacy deserializer can create unexpected boundary range tombstones (CASSANDRA-13237) + * Remove unnecessary assertion from AntiCompactionTest (CASSANDRA-13070) + * Fix cqlsh COPY for dates before 1900 (CASSANDRA-13185) +Merged from 2.2: * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886) * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232) * Coalescing strategy sleeps too much (CASSANDRA-13090) http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cql3/UpdateParameters.java index 0c58097,65edef7..d902dec --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@@ -80,134 -59,71 +80,156 @@@ public class UpdateParameter throw new InvalidRequestException(String.format("Out of bound timestamp, must be in [%d, %d]", Long.MIN_VALUE + 1, Long.MAX_VALUE)); } - public Cell makeColumn(CellName name, ByteBuffer value) throws InvalidRequestException + public void newRow(Clustering clustering) throws InvalidRequestException + { + if (metadata.isDense() && !metadata.isCompound()) + { + // If it's a COMPACT STORAGE table with a single clustering column, the clustering value is + // translated in Thrift to the full Thrift column name, and for backward compatibility we + // don't want to allow that to be empty (even though this would be fine for the storage engine). + assert clustering.size() == 1; + ByteBuffer value = clustering.get(0); + if (value == null || !value.hasRemaining()) + throw new InvalidRequestException("Invalid empty or null value for column " + metadata.clusteringColumns().get(0).name); + } + + if (clustering == Clustering.STATIC_CLUSTERING) + { + if (staticBuilder == null) + staticBuilder = BTreeRow.unsortedBuilder(nowInSec); + builder = staticBuilder; + } + else + { + if (regularBuilder == null) + regularBuilder = BTreeRow.unsortedBuilder(nowInSec); + builder = regularBuilder; + } + + builder.newRow(clustering); + } + + public Clustering currentClustering() + { + return builder.clustering(); + } + + public void addPrimaryKeyLivenessInfo() + { + builder.addPrimaryKeyLivenessInfo(LivenessInfo.create(metadata, timestamp, ttl, nowInSec)); + } + + public void addRowDeletion() + { + // For compact tables, at the exclusion of the static row (of static compact tables), each row ever has a single column, + // the "compact" one. As such, deleting the row or deleting that single cell is equivalent. We favor the later however + // because that makes it easier when translating back to the old format layout (for thrift and pre-3.0 backward + // compatibility) as we don't have to special case for the row deletion. This is also in line with what we used to do pre-3.0. + if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING) + addTombstone(metadata.compactValueColumn()); + else + builder.addRowDeletion(Row.Deletion.regular(deletionTime)); + } + + public void addTombstone(ColumnDefinition column) throws InvalidRequestException { - QueryProcessor.validateCellName(name, metadata.comparator); - return AbstractCell.create(name, value, timestamp, ttl, metadata); + addTombstone(column, null); } - public Cell makeCounter(CellName name, long delta) throws InvalidRequestException - { - QueryProcessor.validateCellName(name, metadata.comparator); - return new BufferCounterUpdateCell(name, delta, FBUtilities.timestampMicros()); - } + public void addTombstone(ColumnDefinition column, CellPath path) throws InvalidRequestException + { + builder.addCell(BufferCell.tombstone(column, timestamp, nowInSec, path)); + } + + public void addCell(ColumnDefinition column, ByteBuffer value) throws InvalidRequestException + { + addCell(column, null, value); + } + + public void addCell(ColumnDefinition column, CellPath path, ByteBuffer value) throws InvalidRequestException + { + Cell cell = ttl == LivenessInfo.NO_TTL + ? BufferCell.live(metadata, column, timestamp, value, path) + : BufferCell.expiring(column, timestamp, ttl, nowInSec, value, path); + builder.addCell(cell); + } + + public void addCounter(ColumnDefinition column, long increment) throws InvalidRequestException + { + assert ttl == LivenessInfo.NO_TTL; + + // Because column is a counter, we need the value to be a CounterContext. However, we're only creating a + // "counter update", which is a temporary state until we run into 'CounterMutation.updateWithCurrentValue()' + // which does the read-before-write and sets the proper CounterId, clock and updated value. + // + // We thus create a "fake" local shard here. The CounterId/clock used don't matter as this is just a temporary + // state that will be replaced when processing the mutation in CounterMutation, but the reason we use a 'local' + // shard is due to the merging rules: if a user includes multiple updates to the same counter in a batch, those + // multiple updates will be merged in the PartitionUpdate *before* they even reach CounterMutation. So we need + // such update to be added together, and that's what a local shard gives us. + builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createLocal(increment))); + } - public Cell makeTombstone(CellName name) throws InvalidRequestException + public void setComplexDeletionTime(ColumnDefinition column) { - QueryProcessor.validateCellName(name, metadata.comparator); - return new BufferDeletedCell(name, localDeletionTime, timestamp); + builder.addComplexDeletion(column, deletionTime); } - public RangeTombstone makeRangeTombstone(ColumnSlice slice) throws InvalidRequestException + public void setComplexDeletionTimeForOverwrite(ColumnDefinition column) { - QueryProcessor.validateComposite(slice.start, metadata.comparator); - QueryProcessor.validateComposite(slice.finish, metadata.comparator); - return new RangeTombstone(slice.start, slice.finish, timestamp, localDeletionTime); + builder.addComplexDeletion(column, new DeletionTime(deletionTime.markedForDeleteAt() - 1, deletionTime.localDeletionTime())); } - public RangeTombstone makeTombstoneForOverwrite(ColumnSlice slice) throws InvalidRequestException + public Row buildRow() { - QueryProcessor.validateComposite(slice.start, metadata.comparator); - QueryProcessor.validateComposite(slice.finish, metadata.comparator); - return new RangeTombstone(slice.start, slice.finish, timestamp - 1, localDeletionTime); + Row built = builder.build(); + builder = null; // Resetting to null just so we quickly bad usage where we forget to call newRow() after that. + return built; + } + + public DeletionTime deletionTime() + { + return deletionTime; + } + + public RangeTombstone makeRangeTombstone(ClusteringComparator comparator, Clustering clustering) + { + return makeRangeTombstone(Slice.make(comparator, clustering)); + } + + public RangeTombstone makeRangeTombstone(Slice slice) + { + return new RangeTombstone(slice, deletionTime); } + /** - * Returns the prefetched list with the already performed modifications. - * <p>If no modification have yet been performed this method will return the fetched list. - * If some modifications (updates or deletions) have already been done the list returned - * will be the result of the merge of the fetched list and of the pending mutations.</p> ++ * Returns the prefetched row with the already performed modifications. ++ * <p>If no modification have yet been performed this method will return the fetched row or {@code null} if ++ * the row does not exist. If some modifications (updates or deletions) have already been done the row returned ++ * will be the result of the merge of the fetched row and of the pending mutations.</p> + * - * @param rowKey the row key - * @param cql3ColumnName the column name - * @param cf the pending modifications - * @return the prefetched list with the already performed modifications ++ * @param key the partition key ++ * @param clustering the row clustering ++ * @return the prefetched row with the already performed modifications + */ - public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName, ColumnFamily cf) + public Row getPrefetchedRow(DecoratedKey key, Clustering clustering) { - if (prefetchedLists == null) - return Collections.emptyList(); + if (prefetchedRows == null) + return null; - CQL3Row row = prefetchedLists.get(rowKey); + Partition partition = prefetchedRows.get(key); - return partition == null ? null : partition.searchIterator(ColumnFilter.selection(partition.columns()), false).next(clustering); ++ Row prefetchedRow = partition == null ? null : partition.searchIterator(ColumnFilter.selection(partition.columns()), false).next(clustering); + - List<Cell> cql3List = row == null ? Collections.<Cell>emptyList() : row.getMultiCellColumn(cql3ColumnName); ++ // We need to apply the pending mutations to return the row in its current state ++ Row pendingMutations = builder.copy().build(); + - if (!cf.isEmpty()) - { - ColumnFamily currentCf = cf.cloneMe(); - - for (Cell c : cql3List) - currentCf.addColumn(c); ++ if (pendingMutations.isEmpty()) ++ return prefetchedRow; + - CFMetaData cfm = currentCf.metadata(); - CQL3Row.RowIterator iterator = cfm.comparator.CQL3RowBuilder(cfm, timestamp).group(currentCf.iterator()); - // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key) - cql3List = iterator.hasNext() ? iterator.next().getMultiCellColumn(cql3ColumnName) : null; - } ++ if (prefetchedRow == null) ++ return pendingMutations; + - return (cql3List == null) ? Collections.<Cell>emptyList() : cql3List; ++ return Rows.merge(prefetchedRow, pendingMutations, nowInSec) ++ .purge(DeletionPurger.PURGE_ALL, nowInSec); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/BTreeRow.java index ea1d9e0,0000000..fda33d6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@@ -1,697 -1,0 +1,724 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Predicate; + +import com.google.common.base.Function; +import com.google.common.collect.Collections2; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; +import org.apache.cassandra.utils.btree.UpdateFunction; + +/** + * Immutable implementation of a Row object. + */ +public class BTreeRow extends AbstractRow +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY)); + + private final Clustering clustering; + private final LivenessInfo primaryKeyLivenessInfo; + private final Deletion deletion; + + // The data for each columns present in this row in column sorted order. + private final Object[] btree; + + // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove + // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so + // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones, + // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given + // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to + // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and + // no expiring cells, this will be Integer.MAX_VALUE; + private final int minLocalDeletionTime; + + private BTreeRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree, int minLocalDeletionTime) + { + assert !deletion.isShadowedBy(primaryKeyLivenessInfo); + this.clustering = clustering; + this.primaryKeyLivenessInfo = primaryKeyLivenessInfo; + this.deletion = deletion; + this.btree = btree; + this.minLocalDeletionTime = minLocalDeletionTime; + } + + private BTreeRow(Clustering clustering, Object[] btree, int minLocalDeletionTime) + { + this(clustering, LivenessInfo.EMPTY, Deletion.LIVE, btree, minLocalDeletionTime); + } + + // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases. + public static BTreeRow create(Clustering clustering, LivenessInfo primaryKeyLivenessInfo, Deletion deletion, Object[] btree) + { + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion.time())); + if (minDeletionTime != Integer.MIN_VALUE) + { + for (ColumnData cd : BTree.<ColumnData>iterable(btree)) + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); + } + + return new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + } + + public static BTreeRow emptyRow(Clustering clustering) + { + return new BTreeRow(clustering, BTree.empty(), Integer.MAX_VALUE); + } + + public static BTreeRow singleCellRow(Clustering clustering, Cell cell) + { + if (cell.column().isSimple()) + return new BTreeRow(clustering, BTree.singleton(cell), minDeletionTime(cell)); + + ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); + return new BTreeRow(clustering, BTree.singleton(complexData), minDeletionTime(cell)); + } + + public static BTreeRow emptyDeletedRow(Clustering clustering, Deletion deletion) + { + assert !deletion.isLive(); + return new BTreeRow(clustering, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE); + } + + public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo) + { + assert !primaryKeyLivenessInfo.isEmpty(); + return new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo)); + } + + private static int minDeletionTime(Cell cell) + { + return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime(); + } + + private static int minDeletionTime(LivenessInfo info) + { + return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE; + } + + private static int minDeletionTime(DeletionTime dt) + { + return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE; + } + + private static int minDeletionTime(ComplexColumnData cd) + { + int min = minDeletionTime(cd.complexDeletion()); + for (Cell cell : cd) + { + min = Math.min(min, minDeletionTime(cell)); + if (min == Integer.MIN_VALUE) + break; + } + return min; + } + + private static int minDeletionTime(ColumnData cd) + { + return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd); + } + + private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion) + { + int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion)); + for (ColumnData cd : BTree.<ColumnData>iterable(btree)) + { + min = Math.min(min, minDeletionTime(cd)); + if (min == Integer.MIN_VALUE) + break; + } + return min; + } + + public Clustering clustering() + { + return clustering; + } + + public Collection<ColumnDefinition> columns() + { + return Collections2.transform(this, ColumnData::column); + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return primaryKeyLivenessInfo; + } + + public boolean isEmpty() + { + return primaryKeyLivenessInfo().isEmpty() + && deletion().isLive() + && BTree.isEmpty(btree); + } + + public Deletion deletion() + { + return deletion; + } + + public Cell getCell(ColumnDefinition c) + { + assert !c.isComplex(); + return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + assert c.isComplex(); + ComplexColumnData cd = getComplexColumnData(c); + if (cd == null) + return null; + return cd.getCell(path); + } + + public ComplexColumnData getComplexColumnData(ColumnDefinition c) + { + assert c.isComplex(); + return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); + } + + public int size() + { + return BTree.size(btree); + } + + public Iterator<ColumnData> iterator() + { + return searchIterator(); + } + + public Iterable<Cell> cells() + { + return CellIterator::new; + } + + public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC); + } + + public Row filter(ColumnFilter filter, CFMetaData metadata) + { + return filter(filter, DeletionTime.LIVE, false, metadata); + } + + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata) + { + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); + + if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty()) + return this; + + boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time()); + + LivenessInfo newInfo = primaryKeyLivenessInfo; + Deletion newDeletion = deletion; + if (mayHaveShadowed) + { + if (activeDeletion.deletes(newInfo.timestamp())) + newInfo = LivenessInfo.EMPTY; + // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow, + // the row deletion is shadowed and we shouldn't return it. + newDeletion = setActiveDeletionToRow ? Deletion.regular(activeDeletion) : Deletion.LIVE; + } + + Columns columns = filter.fetchedColumns().columns(isStatic()); + Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester(); + return transformAndFilter(newInfo, newDeletion, (cd) -> { + + ColumnDefinition column = cd.column(); + if (!inclusionTester.test(column)) + return null; + + CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); + if (column.isComplex()) + return ((ComplexColumnData) cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped); + + Cell cell = (Cell) cd; + return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)) + ? cell : null; + }); + } + + public boolean hasComplex() + { + // We start by the end cause we know complex columns sort after the simple ones + ColumnData cd = Iterables.getFirst(BTree.<ColumnData>iterable(btree, BTree.Dir.DESC), null); + return cd != null && cd.column.isComplex(); + } + + public boolean hasComplexDeletion() + { + // We start by the end cause we know complex columns sort before simple ones + for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC)) + { + if (cd.column().isSimple()) + return false; + + if (!((ComplexColumnData)cd).complexDeletion().isLive()) + return true; + } + return false; + } + + public Row markCounterLocalToBeCleared() + { + return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter() + ? cd.markCounterLocalToBeCleared() + : cd); + } + + public boolean hasDeletion(int nowInSec) + { + return nowInSec >= minLocalDeletionTime; + } + + /** + * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and + * all deletion timestamp by {@code newTimestamp - 1}. + * + * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details. + */ + public Row updateAllTimestamp(long newTimestamp) + { + LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp); + // If the deletion is shadowable and the row has a timestamp, we'll forced the deletion timestamp to be less than the row one, so we + // should get rid of said deletion. + Deletion newDeletion = deletion.isLive() || (deletion.isShadowable() && !primaryKeyLivenessInfo.isEmpty()) + ? Deletion.LIVE + : new Deletion(new DeletionTime(newTimestamp - 1, deletion.time().localDeletionTime()), deletion.isShadowable()); + + return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp)); + } + + public Row withRowDeletion(DeletionTime newDeletion) + { + // Note that: + // - it is a contract with the caller that the new deletion shouldn't shadow anything in + // the row, and so in particular it can't shadow the row deletion. So if there is a + // already a row deletion we have nothing to do. + // - we set the minLocalDeletionTime to MIN_VALUE because we know the deletion is live + return newDeletion.isLive() || !deletion.isLive() + ? this + : new BTreeRow(clustering, primaryKeyLivenessInfo, Deletion.regular(newDeletion), btree, Integer.MIN_VALUE); + } + + public Row purge(DeletionPurger purger, int nowInSec) + { + if (!hasDeletion(nowInSec)) + return this; + + LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo; + Deletion newDeletion = purger.shouldPurge(deletion.time()) ? Deletion.LIVE : deletion; + + return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec)); + } + + private Row transformAndFilter(LivenessInfo info, Deletion deletion, Function<ColumnData, ColumnData> function) + { + Object[] transformed = BTree.transformAndFilter(btree, function); + + if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion) + return this; + + if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed)) + return null; + + int minDeletionTime = minDeletionTime(transformed, info, deletion.time()); + return new BTreeRow(clustering, info, deletion, transformed, minDeletionTime); + } + + public int dataSize() + { + int dataSize = clustering.dataSize() + + primaryKeyLivenessInfo.dataSize() + + deletion.dataSize(); + + for (ColumnData cd : this) + dataSize += cd.dataSize(); + return dataSize; + } + + public long unsharedHeapSizeExcludingData() + { + long heapSize = EMPTY_SIZE + + clustering.unsharedHeapSizeExcludingData() + + BTree.sizeOfStructureOnHeap(btree); + + for (ColumnData cd : this) + heapSize += cd.unsharedHeapSizeExcludingData(); + return heapSize; + } + + public static Row.Builder sortedBuilder() + { + return new Builder(true); + } + + public static Row.Builder unsortedBuilder(int nowInSec) + { + return new Builder(false, nowInSec); + } + + // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general + // assumption that Row objects are immutable. This method should go away post-#6506 in particular. + // This method is in particular not exposed by the Row API on purpose. + // This method also *assumes* that the cell we're setting already exists. + public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value) + { + ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column); + if (column.isSimple()) + BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value)); + else + ((ComplexColumnData) current).setValue(path, value); + } + + public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed) + { + return () -> new CellInLegacyOrderIterator(metadata, reversed); + } + + private class CellIterator extends AbstractIterator<Cell> + { + private Iterator<ColumnData> columnData = iterator(); + private Iterator<Cell> complexCells; + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (!columnData.hasNext()) + return endOfData(); + + ColumnData cd = columnData.next(); + if (cd.column().isComplex()) + complexCells = ((ComplexColumnData)cd).iterator(); + else + return (Cell)cd; + } + } + } + + private class CellInLegacyOrderIterator extends AbstractIterator<Cell> + { + private final Comparator<ByteBuffer> comparator; + private final boolean reversed; + private final int firstComplexIdx; + private int simpleIdx; + private int complexIdx; + private Iterator<Cell> complexCells; + private final Object[] data; + + private CellInLegacyOrderIterator(CFMetaData metadata, boolean reversed) + { + AbstractType<?> nameComparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR); + this.comparator = reversed ? Collections.reverseOrder(nameComparator) : nameComparator; + this.reversed = reversed; + + // copy btree into array for simple separate iteration of simple and complex columns + this.data = new Object[BTree.size(btree)]; + BTree.toArray(btree, data, 0); + + int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData); + this.firstComplexIdx = idx < 0 ? data.length : idx; + this.complexIdx = firstComplexIdx; + } + + private int getSimpleIdx() + { + return reversed ? firstComplexIdx - simpleIdx - 1 : simpleIdx; + } + + private int getSimpleIdxAndIncrement() + { + int idx = getSimpleIdx(); + ++simpleIdx; + return idx; + } + + private int getComplexIdx() + { + return reversed ? data.length + firstComplexIdx - complexIdx - 1 : complexIdx; + } + + private int getComplexIdxAndIncrement() + { + int idx = getComplexIdx(); + ++complexIdx; + return idx; + } + + private Iterator<Cell> makeComplexIterator(Object complexData) + { + ComplexColumnData ccd = (ComplexColumnData)complexData; + return reversed ? ccd.reverseIterator() : ccd.iterator(); + } + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (simpleIdx >= firstComplexIdx) + { + if (complexIdx >= data.length) + return endOfData(); + + complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]); + } + else + { + if (complexIdx >= data.length) + return (Cell)data[getSimpleIdxAndIncrement()]; + + if (comparator.compare(((ColumnData) data[getSimpleIdx()]).column().name.bytes, ((ColumnData) data[getComplexIdx()]).column().name.bytes) < 0) + return (Cell)data[getSimpleIdxAndIncrement()]; + else + complexCells = makeComplexIterator(data[getComplexIdxAndIncrement()]); + } + } + } + } + + public static class Builder implements Row.Builder + { + // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time + private static class ComplexColumnDeletion extends BufferCell + { + public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime) + { + super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM); + } + } + + // converts a run of Cell with equal column into a ColumnData + private static class CellResolver implements BTree.Builder.Resolver + { + final int nowInSec; + private CellResolver(int nowInSec) + { + this.nowInSec = nowInSec; + } + + public ColumnData resolve(Object[] cells, int lb, int ub) + { + Cell cell = (Cell) cells[lb]; + ColumnDefinition column = cell.column; + if (cell.column.isSimple()) + { + assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE; + while (++lb < ub) + cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec); + return cell; + } + + // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are + // bedded in, as less important; galloping makes it pretty cheap anyway) + Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator()); + DeletionTime deletion = DeletionTime.LIVE; + // Deal with complex deletion (for which we've use "fake" ComplexColumnDeletion cells that we need to remove). + // Note that in almost all cases we'll at most one of those fake cell, but the contract of {{Row.Builder.addComplexDeletion}} + // does not forbid it being called twice (especially in the unsorted case) and this can actually happen when reading + // legacy sstables (see #10743). + while (lb < ub) + { + cell = (Cell) cells[lb]; + if (!(cell instanceof ComplexColumnDeletion)) + break; + + if (cell.timestamp() > deletion.markedForDeleteAt()) + deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime()); + lb++; + } + - List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub); - if (deletion != DeletionTime.LIVE) ++ List<Object> buildFrom = new ArrayList<>(ub - lb); ++ Cell previous = null; ++ for (int i = lb; i < ub; i++) + { - // Make sure we don't include any shadowed cells - List<Object> filtered = new ArrayList<>(buildFrom.size()); - for (Object c : buildFrom) ++ Cell c = (Cell) cells[i]; ++ ++ if (deletion == DeletionTime.LIVE || c.timestamp() >= deletion.markedForDeleteAt()) + { - if (((Cell)c).timestamp() >= deletion.markedForDeleteAt()) - filtered.add(c); ++ if (previous != null && column.cellComparator().compare(previous, c) == 0) ++ { ++ c = Cells.reconcile(previous, c, nowInSec); ++ buildFrom.set(buildFrom.size() - 1, c); ++ } ++ else ++ { ++ buildFrom.add(c); ++ } ++ previous = c; + } - buildFrom = filtered; + } ++ + Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp()); + return new ComplexColumnData(column, btree, deletion); + } + + }; + protected Clustering clustering; + protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY; + protected Deletion deletion = Deletion.LIVE; + + private final boolean isSorted; + private final BTree.Builder<Cell> cells; + private final CellResolver resolver; + private boolean hasComplex = false; + + // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion. + + protected Builder(boolean isSorted) + { + this(isSorted, Integer.MIN_VALUE); + } + + protected Builder(boolean isSorted, int nowInSecs) + { + this.cells = BTree.builder(ColumnData.comparator); + resolver = new CellResolver(nowInSecs); + this.isSorted = isSorted; + this.cells.auto(false); + } + ++ protected Builder(Builder builder) ++ { ++ clustering = builder.clustering; ++ primaryKeyLivenessInfo = builder.primaryKeyLivenessInfo; ++ deletion = builder.deletion; ++ cells = builder.cells.copy(); ++ resolver = builder.resolver; ++ isSorted = builder.isSorted; ++ hasComplex = builder.hasComplex; ++ } ++ ++ @Override ++ public Builder copy() ++ { ++ return new Builder(this); ++ } ++ + public boolean isSorted() + { + return isSorted; + } + + public void newRow(Clustering clustering) + { + assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before + this.clustering = clustering; + } + + public Clustering clustering() + { + return clustering; + } + + protected void reset() + { + this.clustering = null; + this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; + this.deletion = Deletion.LIVE; + this.cells.reuse(); ++ this.hasComplex = false; + } + + public void addPrimaryKeyLivenessInfo(LivenessInfo info) + { + // The check is only required for unsorted builders, but it's worth the extra safety to have it unconditional + if (!deletion.deletes(info)) + this.primaryKeyLivenessInfo = info; + } + + public void addRowDeletion(Deletion deletion) + { + this.deletion = deletion; + // The check is only required for unsorted builders, but it's worth the extra safety to have it unconditional + if (deletion.deletes(primaryKeyLivenessInfo)) + this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; + } + + public void addCell(Cell cell) + { + assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; + // In practice, only unsorted builder have to deal with shadowed cells, but it doesn't cost us much to deal with it unconditionally in this case + if (deletion.deletes(cell)) + return; + + cells.add(cell); + hasComplex |= cell.column.isComplex(); + } + + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + cells.add(new ComplexColumnDeletion(column, complexDeletion)); + hasComplex = true; + } + + public Row build() + { + if (!isSorted) + cells.sort(); + // we can avoid resolving if we're sorted and have no complex values + // (because we'll only have unique simple cells, which are already in their final condition) + if (!isSorted | hasComplex) + cells.resolve(resolver); + Object[] btree = cells.build(); + + if (deletion.isShadowedBy(primaryKeyLivenessInfo)) + deletion = Deletion.LIVE; + + int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion.time()); + Row row = new BTreeRow(clustering, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + reset(); + return row; + } + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/Row.java index c7c3216,0000000..74d8664 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@@ -1,690 -1,0 +1,696 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.*; +import java.security.MessageDigest; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.service.paxos.Commit; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.UpdateFunction; + +/** + * Storage engine representation of a row. + * + * A row mainly contains the following informations: + * 1) Its {@code Clustering}, which holds the values for the clustering columns identifying the row. + * 2) Its row level informations: the primary key liveness infos and the row deletion (see + * {@link #primaryKeyLivenessInfo()} and {@link #deletion()} for more details). + * 3) Data for the columns it contains, or in other words, it's a (sorted) collection of + * {@code ColumnData}. + * + * Also note that as for every other storage engine object, a {@code Row} object cannot shadow + * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own + * row deletion. + */ +public interface Row extends Unfiltered, Collection<ColumnData> +{ + /** + * The clustering values for this row. + */ + @Override + public Clustering clustering(); + + /** + * An in-natural-order collection of the columns for which data (incl. simple tombstones) + * is present in this row. + */ + public Collection<ColumnDefinition> columns(); + + /** + * The row deletion. + * + * This correspond to the last row deletion done on this row. + * + * @return the row deletion. + */ + public Deletion deletion(); + + /** + * Liveness information for the primary key columns of this row. + * <p> + * As a row is uniquely identified by its primary key, all its primary key columns + * share the same {@code LivenessInfo}. This liveness information is what allows us + * to distinguish between a dead row (it has no live cells and its primary key liveness + * info is empty) and a live row but where all non PK columns are null (it has no + * live cells, but its primary key liveness is not empty). Please note that the liveness + * info (including it's eventually ttl/local deletion time) only apply to the primary key + * columns and has no impact on the row content. + * <p> + * Note in particular that a row may have live cells but no PK liveness info, because the + * primary key liveness informations are only set on {@code INSERT} (which makes sense + * in itself, see #6782) but live cells can be added through {@code UPDATE} even if the row + * wasn't pre-existing (which users are encouraged not to do, but we can't validate). + */ + public LivenessInfo primaryKeyLivenessInfo(); + + /** + * Whether the row correspond to a static row or not. + * + * @return whether the row correspond to a static row or not. + */ + public boolean isStatic(); + + /** + * Whether the row has no information whatsoever. This means no PK liveness info, no row + * deletion, no cells and no complex deletion info. + * + * @return {@code true} if the row has no data, {@code false} otherwise. + */ + public boolean isEmpty(); + + /** + * Whether the row has some live information (i.e. it's not just deletion informations). + */ + public boolean hasLiveData(int nowInSec); + + /** + * Returns a cell for a simple column. + * + * @param c the simple column for which to fetch the cell. + * @return the corresponding cell or {@code null} if the row has no such cell. + */ + public Cell getCell(ColumnDefinition c); + + /** + * Return a cell for a given complex column and cell path. + * + * @param c the complex column for which to fetch the cell. + * @param path the cell path for which to fetch the cell. + * @return the corresponding cell or {@code null} if the row has no such cell. + */ + public Cell getCell(ColumnDefinition c, CellPath path); + + /** + * The data for a complex column. + * <p> + * The returned object groups all the cells for the column, as well as it's complex deletion (if relevant). + * + * @param c the complex column for which to return the complex data. + * @return the data for {@code c} or {@code null} is the row has no data for this column. + */ + public ComplexColumnData getComplexColumnData(ColumnDefinition c); + + /** + * An iterable over the cells of this row. + * <p> + * The iterable guarantees that cells are returned in order of {@link Cell#comparator}. + * + * @return an iterable over the cells of this row. + */ + public Iterable<Cell> cells(); + + /** + * An iterable over the cells of this row that return cells in "legacy order". + * <p> + * In 3.0+, columns are sorted so that all simple columns are before all complex columns. Previously + * however, the cells where just sorted by the column name. This iterator return cells in that + * legacy order. It's only ever meaningful for backward/thrift compatibility code. + * + * @param metadata the table this is a row of. + * @param reversed if cells should returned in reverse order. + * @return an iterable over the cells of this row in "legacy order". + */ + public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata, boolean reversed); + + /** + * Whether the row stores any (non-live) complex deletion for any complex column. + */ + public boolean hasComplexDeletion(); + + /** + * Whether the row stores any (non-RT) data for any complex column. + */ + boolean hasComplex(); + + /** + * Whether the row has any deletion info (row deletion, cell tombstone, expired cell or complex deletion). + * + * @param nowInSec the current time in seconds to decid if a cell is expired. + */ + public boolean hasDeletion(int nowInSec); + + /** + * An iterator to efficiently search data for a given column. + * + * @return a search iterator for the cells of this row. + */ + public SearchIterator<ColumnDefinition, ColumnData> searchIterator(); + + /** + * Returns a copy of this row that: + * 1) only includes the data for the column included by {@code filter}. + * 2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}). + */ + public Row filter(ColumnFilter filter, CFMetaData metadata); + + /** + * Returns a copy of this row that: + * 1) only includes the data for the column included by {@code filter}. + * 2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}). + * 3) doesn't include any data that is shadowed/deleted by {@code activeDeletion}. + * 4) uses {@code activeDeletion} as row deletion iff {@code setActiveDeletionToRow} and {@code activeDeletion} supersedes the row deletion. + */ + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata); + + /** + * Returns a copy of this row without any deletion info that should be purged according to {@code purger}. + * + * @param purger the {@code DeletionPurger} to use to decide what can be purged. + * @param nowInSec the current time to decide what is deleted and what isn't (in the case of expired cells). + * @return this row but without any deletion info purged by {@code purger}. If the purged row is empty, returns + * {@code null}. + */ + public Row purge(DeletionPurger purger, int nowInSec); + + /** + * Returns a copy of this row where all counter cells have they "local" shard marked for clearing. + */ + public Row markCounterLocalToBeCleared(); + + /** + * Returns a copy of this row where all live timestamp have been replaced by {@code newTimestamp} and every deletion + * timestamp by {@code newTimestamp - 1}. + * + * @param newTimestamp the timestamp to use for all live data in the returned row. + * @param a copy of this row with timestamp updated using {@code newTimestamp}. This can return {@code null} in the + * rare where the row only as a shadowable row deletion and the new timestamp supersedes it. + * + * @see Commit for why we need this. + */ + public Row updateAllTimestamp(long newTimestamp); + + /** + * Returns a copy of this row with the new deletion as row deletion if it is more recent + * than the current row deletion. + * <p> + * WARNING: this method <b>does not</b> check that nothing in the row is shadowed by the provided + * deletion and if that is the case, the created row will be <b>invalid</b>. It is thus up to the + * caller to verify that this is not the case and the only reasonable use case of this is probably + * when the row and the deletion comes from the same {@code UnfilteredRowIterator} since that gives + * use this guarantee. + */ + public Row withRowDeletion(DeletionTime deletion); + + public int dataSize(); + + public long unsharedHeapSizeExcludingData(); + + public String toString(CFMetaData metadata, boolean fullDetails); + + /** + * A row deletion/tombstone. + * <p> + * A row deletion mostly consists of the time of said deletion, but there is 2 variants: shadowable + * and regular row deletion. + * <p> + * A shadowable row deletion only exists if the row has no timestamp. In other words, the deletion is only + * valid as long as no newer insert is done (thus setting a row timestap; note that if the row timestamp set + * is lower than the deletion, it is shadowed (and thus ignored) as usual). + * <p> + * That is, if a row has a shadowable deletion with timestamp A and an update is madeto that row with a + * timestamp B such that B > A (and that update sets the row timestamp), then the shadowable deletion is 'shadowed' + * by that update. A concrete consequence is that if said update has cells with timestamp lower than A, then those + * cells are preserved(since the deletion is removed), and this contrarily to a normal (regular) deletion where the + * deletion is preserved and such cells are removed. + * <p> + * Currently, the only use of shadowable row deletions is Materialized Views, see CASSANDRA-10261. + */ + public static class Deletion + { + public static final Deletion LIVE = new Deletion(DeletionTime.LIVE, false); + + private final DeletionTime time; + private final boolean isShadowable; + + public Deletion(DeletionTime time, boolean isShadowable) + { + assert !time.isLive() || !isShadowable; + this.time = time; + this.isShadowable = isShadowable; + } + + public static Deletion regular(DeletionTime time) + { + return time.isLive() ? LIVE : new Deletion(time, false); + } + + public static Deletion shadowable(DeletionTime time) + { + return new Deletion(time, true); + } + + /** + * The time of the row deletion. + * + * @return the time of the row deletion. + */ + public DeletionTime time() + { + return time; + } + + /** + * Whether the deletion is a shadowable one or not. + * + * @return whether the deletion is a shadowable one. Note that if {@code isLive()}, then this is + * guarantee to return {@code false}. + */ + public boolean isShadowable() + { + return isShadowable; + } + + /** + * Wether the deletion is live or not, that is if its an actual deletion or not. + * + * @return {@code true} if this represents no deletion of the row, {@code false} if that's an actual + * deletion. + */ + public boolean isLive() + { + return time().isLive(); + } + + public boolean supersedes(DeletionTime that) + { + return time.supersedes(that); + } + + public boolean supersedes(Deletion that) + { + return time.supersedes(that.time); + } + + public boolean isShadowedBy(LivenessInfo primaryKeyLivenessInfo) + { + return isShadowable && primaryKeyLivenessInfo.timestamp() > time.markedForDeleteAt(); + } + + public boolean deletes(LivenessInfo info) + { + return time.deletes(info); + } + + public boolean deletes(Cell cell) + { + return time.deletes(cell); + } + + public void digest(MessageDigest digest) + { + time.digest(digest); + FBUtilities.updateWithBoolean(digest, isShadowable); + } + + public int dataSize() + { + return time.dataSize() + 1; + } + + @Override + public boolean equals(Object o) + { + if(!(o instanceof Deletion)) + return false; + Deletion that = (Deletion)o; + return this.time.equals(that.time) && this.isShadowable == that.isShadowable; + } + + @Override + public final int hashCode() + { + return Objects.hash(time, isShadowable); + } + + @Override + public String toString() + { + return String.format("%s%s", time, isShadowable ? "(shadowable)" : ""); + } + } + + /** + * Interface for building rows. + * <p> + * The builder of a row should always abid to the following rules: + * 1) {@link #newRow} is always called as the first thing for the row. + * 2) {@link #addPrimaryKeyLivenessInfo} and {@link #addRowDeletion}, if called, are called before + * any {@link #addCell}/{@link #addComplexDeletion} call. + * 3) {@link #build} is called to construct the new row. The builder can then be reused. + * + * There is 2 variants of a builder: sorted and unsorted ones. A sorted builder expects user to abid to the + * following additional rules: + * 4) Calls to {@link #addCell}/{@link #addComplexDeletion} are done in strictly increasing column order. + * In other words, all calls to these methods for a give column {@code c} are done after any call for + * any column before {@code c} and before any call for any column after {@code c}. + * 5) Calls to {@link #addCell} are further done in strictly increasing cell order (the one defined by + * {@link Cell#comparator}. That is, for a give column, cells are passed in {@code CellPath} order. + * 6) No shadowed data should be added. Concretely, this means that if a a row deletion is added, it doesn't + * deletes the row timestamp or any cell added later, and similarly no cell added is deleted by the complex + * deletion of the column this is a cell of. + * + * An unsorted builder will not expect those last rules however: {@link #addCell} and {@link #addComplexDeletion} + * can be done in any order. And in particular unsorted builder allows multiple calls for the same column/cell. In + * that latter case, the result will follow the usual reconciliation rules (so equal cells are reconciled with + * {@link Cells#reconcile} and the "biggest" of multiple complex deletion for the same column wins). + */ + public interface Builder + { + /** ++ * Creates a copy of this {@code Builder}. ++ * @return a copy of this {@code Builder} ++ */ ++ public Builder copy(); ++ ++ /** + * Whether the builder is a sorted one or not. + * + * @return if the builder requires calls to be done in sorted order or not (see above). + */ + public boolean isSorted(); + + /** + * Prepares the builder to build a new row of clustering {@code clustering}. + * <p> + * This should always be the first call for a given row. + * + * @param clustering the clustering for the new row. + */ + public void newRow(Clustering clustering); + + /** + * The clustering for the row that is currently being built. + * + * @return the clustering for the row that is currently being built, or {@code null} if {@link #newRow} hasn't + * yet been called. + */ + public Clustering clustering(); + + /** + * Adds the liveness information for the primary key columns of this row. + * + * This call is optional (skipping it is equivalent to calling {@code addPartitionKeyLivenessInfo(LivenessInfo.NONE)}). + * + * @param info the liveness information for the primary key columns of the built row. + */ + public void addPrimaryKeyLivenessInfo(LivenessInfo info); + + /** + * Adds the deletion information for this row. + * + * This call is optional and can be skipped if the row is not deleted. + * + * @param deletion the row deletion time, or {@code Deletion.LIVE} if the row isn't deleted. + */ + public void addRowDeletion(Deletion deletion); + + /** + * Adds a cell to this builder. + * + * @param cell the cell to add. + */ + public void addCell(Cell cell); + + /** + * Adds a complex deletion. + * + * @param column the column for which to add the {@code complexDeletion}. + * @param complexDeletion the complex deletion time to add. + */ + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion); + + /** + * Builds and return built row. + * + * @return the last row built by this builder. + */ + public Row build(); + } + + /** + * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators). + */ + public static class Merger + { + private final Row[] rows; + private final List<Iterator<ColumnData>> columnDataIterators; + + private Clustering clustering; + private int rowsToMerge; + private int lastRowSet = -1; + + private final List<ColumnData> dataBuffer = new ArrayList<>(); + private final ColumnDataReducer columnDataReducer; + + public Merger(int size, int nowInSec, boolean hasComplex) + { + this.rows = new Row[size]; + this.columnDataIterators = new ArrayList<>(size); + this.columnDataReducer = new ColumnDataReducer(size, nowInSec, hasComplex); + } + + public void clear() + { + dataBuffer.clear(); + Arrays.fill(rows, null); + columnDataIterators.clear(); + rowsToMerge = 0; + lastRowSet = -1; + } + + public void add(int i, Row row) + { + clustering = row.clustering(); + rows[i] = row; + ++rowsToMerge; + lastRowSet = i; + } + + public Row merge(DeletionTime activeDeletion) + { + // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out), + // then we can just return that single row + if (rowsToMerge == 1 && activeDeletion.isLive()) + { + Row row = rows[lastRowSet]; + assert row != null; + return row; + } + + LivenessInfo rowInfo = LivenessInfo.EMPTY; + Deletion rowDeletion = Deletion.LIVE; + for (Row row : rows) + { + if (row == null) + continue; + + if (row.primaryKeyLivenessInfo().supersedes(rowInfo)) + rowInfo = row.primaryKeyLivenessInfo(); + if (row.deletion().supersedes(rowDeletion)) + rowDeletion = row.deletion(); + } + + if (rowDeletion.isShadowedBy(rowInfo)) + rowDeletion = Deletion.LIVE; + + if (rowDeletion.supersedes(activeDeletion)) + activeDeletion = rowDeletion.time(); + else + rowDeletion = Deletion.LIVE; + + if (activeDeletion.deletes(rowInfo)) + rowInfo = LivenessInfo.EMPTY; + + for (Row row : rows) + columnDataIterators.add(row == null ? Collections.emptyIterator() : row.iterator()); + + columnDataReducer.setActiveDeletion(activeDeletion); + Iterator<ColumnData> merged = MergeIterator.get(columnDataIterators, ColumnData.comparator, columnDataReducer); + while (merged.hasNext()) + { + ColumnData data = merged.next(); + if (data != null) + dataBuffer.add(data); + } + + // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row + return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty() + ? null + : BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp())); + } + + public Clustering mergedClustering() + { + return clustering; + } + + public Row[] mergedRows() + { + return rows; + } + + private static class ColumnDataReducer extends MergeIterator.Reducer<ColumnData, ColumnData> + { + private final int nowInSec; + + private ColumnDefinition column; + private final List<ColumnData> versions; + + private DeletionTime activeDeletion; + + private final ComplexColumnData.Builder complexBuilder; + private final List<Iterator<Cell>> complexCells; + private final CellReducer cellReducer; + + public ColumnDataReducer(int size, int nowInSec, boolean hasComplex) + { + this.nowInSec = nowInSec; + this.versions = new ArrayList<>(size); + this.complexBuilder = hasComplex ? ComplexColumnData.builder() : null; + this.complexCells = hasComplex ? new ArrayList<>(size) : null; + this.cellReducer = new CellReducer(nowInSec); + } + + public void setActiveDeletion(DeletionTime activeDeletion) + { + this.activeDeletion = activeDeletion; + } + + public void reduce(int idx, ColumnData data) + { + column = data.column(); + versions.add(data); + } + + protected ColumnData getReduced() + { + if (column.isSimple()) + { + Cell merged = null; + for (ColumnData data : versions) + { + Cell cell = (Cell)data; + if (!activeDeletion.deletes(cell)) + merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec); + } + return merged; + } + else + { + complexBuilder.newColumn(column); + complexCells.clear(); + DeletionTime complexDeletion = DeletionTime.LIVE; + for (ColumnData data : versions) + { + ComplexColumnData cd = (ComplexColumnData)data; + if (cd.complexDeletion().supersedes(complexDeletion)) + complexDeletion = cd.complexDeletion(); + complexCells.add(cd.iterator()); + } + + if (complexDeletion.supersedes(activeDeletion)) + { + cellReducer.setActiveDeletion(complexDeletion); + complexBuilder.addComplexDeletion(complexDeletion); + } + else + { + cellReducer.setActiveDeletion(activeDeletion); + } + + Iterator<Cell> cells = MergeIterator.get(complexCells, Cell.comparator, cellReducer); + while (cells.hasNext()) + { + Cell merged = cells.next(); + if (merged != null) + complexBuilder.addCell(merged); + } + return complexBuilder.build(); + } + } + + protected void onKeyChange() + { + versions.clear(); + } + } + + private static class CellReducer extends MergeIterator.Reducer<Cell, Cell> + { + private final int nowInSec; + + private DeletionTime activeDeletion; + private Cell merged; + + public CellReducer(int nowInSec) + { + this.nowInSec = nowInSec; + } + + public void setActiveDeletion(DeletionTime activeDeletion) + { + this.activeDeletion = activeDeletion; + onKeyChange(); + } + + public void reduce(int idx, Cell cell) + { + if (!activeDeletion.deletes(cell)) + merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec); + } + + protected Cell getReduced() + { + return merged; + } + + protected void onKeyChange() + { + merged = null; + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/src/java/org/apache/cassandra/utils/btree/BTree.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/utils/btree/BTree.java index fe08011,1145d12..e6e6e40 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@@ -744,301 -361,15 +744,320 @@@ public class BTre } }; - // return a sorted collection - private static <V> Collection<V> sorted(Iterable<V> source, Comparator<V> comparator, int size) + public static <V> Builder<V> builder(Comparator<? super V> comparator) + { + return new Builder<>(comparator); + } + + public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity) { - V[] vs = (V[]) new Object[size]; - int i = 0; - for (V v : source) - vs[i++] = v; - Arrays.sort(vs, comparator); - return Arrays.asList(vs); + return new Builder<>(comparator); + } + + public static class Builder<V> + { + + // a user-defined bulk resolution, to be applied manually via resolve() + public static interface Resolver + { + // can return a different output type to input, so long as sort order is maintained + // if a resolver is present, this method will be called for every sequence of equal inputs + // even those with only one item + Object resolve(Object[] array, int lb, int ub); + } + + // a user-defined resolver that is applied automatically on encountering two duplicate values + public static interface QuickResolver<V> + { + // can return a different output type to input, so long as sort order is maintained + // if a resolver is present, this method will be called for every sequence of equal inputs + // even those with only one item + V resolve(V a, V b); + } + + Comparator<? super V> comparator; + Object[] values; + int count; + boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added + boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates + QuickResolver<V> quickResolver; + + protected Builder(Comparator<? super V> comparator) + { + this(comparator, 16); + } + + protected Builder(Comparator<? super V> comparator, int initialCapacity) + { + this.comparator = comparator; + this.values = new Object[initialCapacity]; + } + ++ private Builder(Builder<V> builder) ++ { ++ this.comparator = builder.comparator; ++ this.values = Arrays.copyOf(builder.values, builder.values.length); ++ this.count = builder.count; ++ this.detected = builder.detected; ++ this.auto = builder.auto; ++ this.quickResolver = builder.quickResolver; ++ } ++ ++ /** ++ * Creates a copy of this {@code Builder}. ++ * @return a copy of this {@code Builder}. ++ */ ++ public Builder<V> copy() ++ { ++ return new Builder<>(this); ++ } ++ + public Builder<V> setQuickResolver(QuickResolver<V> quickResolver) + { + this.quickResolver = quickResolver; + return this; + } + + public void reuse() + { + reuse(comparator); + } + + public void reuse(Comparator<? super V> comparator) + { + this.comparator = comparator; + count = 0; + detected = true; + } + + public Builder<V> auto(boolean auto) + { + this.auto = auto; + return this; + } + + public Builder<V> add(V v) + { + if (count == values.length) + values = Arrays.copyOf(values, count * 2); + + Object[] values = this.values; + int prevCount = this.count++; + values[prevCount] = v; + + if (auto && detected && prevCount > 0) + { + V prev = (V) values[prevCount - 1]; + int c = comparator.compare(prev, v); + if (c == 0 && auto) + { + count = prevCount; + if (quickResolver != null) + values[prevCount - 1] = quickResolver.resolve(prev, v); + } + else if (c > 0) + { + detected = false; + } + } + + return this; + } + + public Builder<V> addAll(Collection<V> add) + { + if (auto && add instanceof SortedSet && equalComparators(comparator, ((SortedSet) add).comparator())) + { + // if we're a SortedSet, permit quick order-preserving addition of items + // if we collect all duplicates, don't bother as merge will necessarily be more expensive than sorting at end + return mergeAll(add, add.size()); + } + detected = false; + if (values.length < count + add.size()) + values = Arrays.copyOf(values, max(count + add.size(), count * 2)); + for (V v : add) + values[count++] = v; + return this; + } + + private static boolean equalComparators(Comparator<?> a, Comparator<?> b) + { + return a == b || (isNaturalComparator(a) && isNaturalComparator(b)); + } + + private static boolean isNaturalComparator(Comparator<?> a) + { + return a == null || a == Comparator.naturalOrder() || a == Ordering.natural(); + } + + // iter must be in sorted order! + private Builder<V> mergeAll(Iterable<V> add, int addCount) + { + assert auto; + // ensure the existing contents are in order + autoEnforce(); + + int curCount = count; + // we make room for curCount * 2 + addCount, so that we can copy the current values to the end + // if necessary for continuing the merge, and have the new values directly after the current value range + if (values.length < curCount * 2 + addCount) + values = Arrays.copyOf(values, max(curCount * 2 + addCount, curCount * 3)); + + if (add instanceof BTreeSet) + { + // use btree set's fast toArray method, to append directly + ((BTreeSet) add).toArray(values, curCount); + } + else + { + // consider calling toArray() and System.arraycopy + int i = curCount; + for (V v : add) + values[i++] = v; + } + return mergeAll(addCount); + } + + private Builder<V> mergeAll(int addCount) + { + Object[] a = values; + int addOffset = count; + + int i = 0, j = addOffset; + int curEnd = addOffset, addEnd = addOffset + addCount; + + // save time in cases where we already have a subset, by skipping dir + while (i < curEnd && j < addEnd) + { + V ai = (V) a[i], aj = (V) a[j]; + // in some cases, such as Columns, we may have identity supersets, so perform a cheap object-identity check + int c = ai == aj ? 0 : comparator.compare(ai, aj); + if (c > 0) + break; + else if (c == 0) + { + if (quickResolver != null) + a[i] = quickResolver.resolve(ai, aj); + j++; + } + i++; + } + + if (j == addEnd) + return this; // already a superset of the new values + + // otherwise, copy the remaining existing values to the very end, freeing up space for merge result + int newCount = i; + System.arraycopy(a, i, a, addEnd, count - i); + curEnd = addEnd + (count - i); + i = addEnd; + + while (i < curEnd && j < addEnd) + { + V ai = (V) a[i]; + V aj = (V) a[j]; + // could avoid one comparison if we cared, but would make this ugly + int c = comparator.compare(ai, aj); + if (c == 0) + { + Object newValue = quickResolver == null ? ai : quickResolver.resolve(ai, aj); + a[newCount++] = newValue; + i++; + j++; + } + else + { + a[newCount++] = c < 0 ? a[i++] : a[j++]; + } + } + + // exhausted one of the inputs; fill in remainder of the other + if (i < curEnd) + { + System.arraycopy(a, i, a, newCount, curEnd - i); + newCount += curEnd - i; + } + else if (j < addEnd) + { + if (j != newCount) + System.arraycopy(a, j, a, newCount, addEnd - j); + newCount += addEnd - j; + } + count = newCount; + return this; + } + + public boolean isEmpty() + { + return count == 0; + } + + public Builder<V> reverse() + { + assert !auto; + int mid = count / 2; + for (int i = 0 ; i < mid ; i++) + { + Object t = values[i]; + values[i] = values[count - (1 + i)]; + values[count - (1 + i)] = t; + } + return this; + } + + public Builder<V> sort() + { + Arrays.sort((V[]) values, 0, count, comparator); + return this; + } + + // automatically enforce sorted+filtered + private void autoEnforce() + { + if (!detected && count > 1) + { + sort(); + int prevIdx = 0; + V prev = (V) values[0]; + for (int i = 1 ; i < count ; i++) + { + V next = (V) values[i]; + if (comparator.compare(prev, next) != 0) + values[++prevIdx] = prev = next; + else if (quickResolver != null) + values[prevIdx] = prev = quickResolver.resolve(prev, next); + } + count = prevIdx + 1; + } + detected = true; + } + + public Builder<V> resolve(Resolver resolver) + { + if (count > 0) + { + int c = 0; + int prev = 0; + for (int i = 1 ; i < count ; i++) + { + if (comparator.compare((V) values[i], (V) values[prev]) != 0) + { + values[c++] = resolver.resolve((V[]) values, prev, i); + prev = i; + } + } + values[c++] = resolver.resolve((V[]) values, prev, count); + count = c; + } + return this; + } + + public Object[] build() + { + if (auto) + autoEnforce(); + return BTree.build(Arrays.asList(values).subList(0, count), UpdateFunction.noOp()); + } } /** simple static wrapper to calls to cmp.compare() which checks if either a or b are Special (i.e. represent an infinity) */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/aeca1d2b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/rows/RowBuilder.java index b1223f1,0000000..ede2ccd mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/rows/RowBuilder.java +++ b/test/unit/org/apache/cassandra/db/rows/RowBuilder.java @@@ -1,84 -1,0 +1,91 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; ++import org.apache.cassandra.db.rows.Row.Builder; +import org.apache.cassandra.utils.Pair; + +/** + * Instrumented Builder implementation for testing the + * behavior of Cells and Rows static methods + */ +public class RowBuilder implements Row.Builder +{ + public List<Cell> cells = new LinkedList<>(); + public Clustering clustering = null; + public LivenessInfo livenessInfo = null; + public Row.Deletion deletionTime = null; + public List<Pair<ColumnDefinition, DeletionTime>> complexDeletions = new LinkedList<>(); + ++ @Override ++ public Builder copy() ++ { ++ throw new UnsupportedOperationException(); ++ } ++ + public void addCell(Cell cell) + { + cells.add(cell); + } + + public boolean isSorted() + { + throw new UnsupportedOperationException(); + } + + public void newRow(Clustering clustering) + { + assert this.clustering == null; + this.clustering = clustering; + } + + public Clustering clustering() + { + return clustering; + } + + public void addPrimaryKeyLivenessInfo(LivenessInfo info) + { + assert livenessInfo == null; + livenessInfo = info; + } + + public void addRowDeletion(Row.Deletion deletion) + { + assert deletionTime == null; + deletionTime = deletion; + } + + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + complexDeletions.add(Pair.create(column, complexDeletion)); + } + + public Row build() + { + throw new UnsupportedOperationException(); + } +}