http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java new file mode 100644 index 0000000..7e50716 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Predicate; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; +import org.apache.cassandra.utils.btree.UpdateFunction; + +/** + * Immutable implementation of a Row object. + */ +public class BTreeRow extends AbstractRow +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(emptyRow(Clustering.EMPTY)); + + private final Clustering clustering; + private final Columns columns; + private final LivenessInfo primaryKeyLivenessInfo; + private final DeletionTime deletion; + + // The data for each columns present in this row in column sorted order. + private final Object[] btree; + + // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove + // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so + // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones, + // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given + // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to + // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and + // no expiring cells, this will be Integer.MAX_VALUE; + private final int minLocalDeletionTime; + + private BTreeRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int minLocalDeletionTime) + { + this.clustering = clustering; + this.columns = columns; + this.primaryKeyLivenessInfo = primaryKeyLivenessInfo; + this.deletion = deletion; + this.btree = btree; + this.minLocalDeletionTime = minLocalDeletionTime; + } + + // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases. + public static BTreeRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree) + { + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); + if (minDeletionTime != Integer.MIN_VALUE) + { + for (ColumnData cd : BTree.<ColumnData>iterable(btree)) + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); + } + + return new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + } + + public static BTreeRow emptyRow(Clustering clustering) + { + return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE); + } + + public static BTreeRow singleCellRow(Clustering clustering, Cell cell) + { + if (cell.column().isSimple()) + return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), minDeletionTime(cell)); + + ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); + return new BTreeRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), minDeletionTime(cell)); + } + + public static BTreeRow emptyDeletedRow(Clustering clustering, DeletionTime deletion) + { + assert !deletion.isLive(); + return new BTreeRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE); + } + + public static BTreeRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo) + { + assert !primaryKeyLivenessInfo.isEmpty(); + return new BTreeRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), minDeletionTime(primaryKeyLivenessInfo)); + } + + private static int minDeletionTime(Cell cell) + { + return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime(); + } + + private static int minDeletionTime(LivenessInfo info) + { + return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE; + } + + private static int minDeletionTime(DeletionTime dt) + { + return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE; + } + + private static int minDeletionTime(ComplexColumnData cd) + { + int min = minDeletionTime(cd.complexDeletion()); + for (Cell cell : cd) + { + min = Math.min(min, minDeletionTime(cell)); + if (min == Integer.MIN_VALUE) + break; + } + return min; + } + + private static int minDeletionTime(ColumnData cd) + { + return cd.column().isSimple() ? minDeletionTime((Cell) cd) : minDeletionTime((ComplexColumnData)cd); + } + + private static int minDeletionTime(Object[] btree, LivenessInfo info, DeletionTime rowDeletion) + { + int min = Math.min(minDeletionTime(info), minDeletionTime(rowDeletion)); + for (ColumnData cd : BTree.<ColumnData>iterable(btree)) + { + min = Math.min(min, minDeletionTime(cd)); + if (min == Integer.MIN_VALUE) + break; + } + return min; + } + + public Clustering clustering() + { + return clustering; + } + + public Columns columns() + { + return columns; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return primaryKeyLivenessInfo; + } + + public boolean isEmpty() + { + return primaryKeyLivenessInfo().isEmpty() + && deletion().isLive() + && BTree.isEmpty(btree); + } + + public DeletionTime deletion() + { + return deletion; + } + + public Cell getCell(ColumnDefinition c) + { + assert !c.isComplex(); + return (Cell) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + assert c.isComplex(); + ComplexColumnData cd = getComplexColumnData(c); + if (cd == null) + return null; + return cd.getCell(path); + } + + public ComplexColumnData getComplexColumnData(ColumnDefinition c) + { + assert c.isComplex(); + return (ComplexColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, c); + } + + public Iterator<ColumnData> iterator() + { + return searchIterator(); + } + + public Iterable<Cell> cells() + { + return CellIterator::new; + } + + public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return BTree.slice(btree, ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC); + } + + public Row filter(ColumnFilter filter, CFMetaData metadata) + { + return filter(filter, DeletionTime.LIVE, false, metadata); + } + + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata) + { + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); + + if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty()) + return this; + + boolean mayHaveShadowed = activeDeletion.supersedes(deletion); + + LivenessInfo newInfo = primaryKeyLivenessInfo; + DeletionTime newDeletion = deletion; + if (mayHaveShadowed) + { + if (activeDeletion.deletes(newInfo.timestamp())) + newInfo = LivenessInfo.EMPTY; + // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow, + // the row deletion is shadowed and we shouldn't return it. + newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE; + } + + Columns columns = filter.fetchedColumns().columns(isStatic()); + Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester(); + return transformAndFilter(newInfo, newDeletion, (cd) -> { + + ColumnDefinition column = cd.column(); + if (!inclusionTester.test(column)) + return null; + + CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); + if (column.isComplex()) + return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped); + + Cell cell = (Cell)cd; + return (dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)) + ? cell : null; + }); + } + + public boolean hasComplexDeletion() + { + // We start by the end cause we know complex columns sort before simple ones + for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC)) + { + if (cd.column().isSimple()) + return false; + + if (!((ComplexColumnData)cd).complexDeletion().isLive()) + return true; + } + return false; + } + + public Row markCounterLocalToBeCleared() + { + return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> cd.column().cellValueType().isCounter() + ? cd.markCounterLocalToBeCleared() + : cd); + } + + public boolean hasDeletion(int nowInSec) + { + return nowInSec >= minLocalDeletionTime; + } + + /** + * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and + * all deletion timestamp by {@code newTimestamp - 1}. + * + * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details. + */ + public Row updateAllTimestamp(long newTimestamp) + { + LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp); + DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime()); + + return transformAndFilter(newInfo, newDeletion, (cd) -> cd.updateAllTimestamp(newTimestamp)); + } + + public Row purge(DeletionPurger purger, int nowInSec) + { + if (!hasDeletion(nowInSec)) + return this; + + LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo; + DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion; + + return transformAndFilter(newInfo, newDeletion, (cd) -> cd.purge(purger, nowInSec)); + } + + private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, Function<ColumnData, ColumnData> function) + { + Object[] transformed = BTree.transformAndFilter(btree, function); + + if (btree == transformed && info == this.primaryKeyLivenessInfo && deletion == this.deletion) + return this; + + if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed)) + return null; + + int minDeletionTime = minDeletionTime(transformed, info, deletion); + return new BTreeRow(clustering, columns, info, deletion, transformed, minDeletionTime); + } + + public int dataSize() + { + int dataSize = clustering.dataSize() + + primaryKeyLivenessInfo.dataSize() + + deletion.dataSize(); + + for (ColumnData cd : this) + dataSize += cd.dataSize(); + return dataSize; + } + + public long unsharedHeapSizeExcludingData() + { + long heapSize = EMPTY_SIZE + + clustering.unsharedHeapSizeExcludingData() + + BTree.sizeOfStructureOnHeap(btree); + + for (ColumnData cd : this) + heapSize += cd.unsharedHeapSizeExcludingData(); + return heapSize; + } + + public static Row.Builder sortedBuilder(Columns columns) + { + return new Builder(columns, true); + } + + public static Row.Builder unsortedBuilder(Columns columns, int nowInSec) + { + return new Builder(columns, false, nowInSec); + } + + // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general + // assumption that Row objects are immutable. This method should go away post-#6506 in particular. + // This method is in particular not exposed by the Row API on purpose. + // This method also *assumes* that the cell we're setting already exists. + public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value) + { + ColumnData current = (ColumnData) BTree.<Object>find(btree, ColumnDefinition.asymmetricColumnDataComparator, column); + if (column.isSimple()) + BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) current).withUpdatedValue(value)); + else + ((ComplexColumnData) current).setValue(path, value); + } + + public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata) + { + return () -> new CellInLegacyOrderIterator(metadata); + } + + private class CellIterator extends AbstractIterator<Cell> + { + private Iterator<ColumnData> columnData = iterator(); + private Iterator<Cell> complexCells; + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (!columnData.hasNext()) + return endOfData(); + + ColumnData cd = columnData.next(); + if (cd.column().isComplex()) + complexCells = ((ComplexColumnData)cd).iterator(); + else + return (Cell)cd; + } + } + } + + private class CellInLegacyOrderIterator extends AbstractIterator<Cell> + { + private final AbstractType<?> comparator; + private final int firstComplexIdx; + private int simpleIdx; + private int complexIdx; + private Iterator<Cell> complexCells; + private final Object[] data; + + private CellInLegacyOrderIterator(CFMetaData metadata) + { + this.comparator = metadata.getColumnDefinitionNameComparator(isStatic() ? ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR); + + // copy btree into array for simple separate iteration of simple and complex columns + this.data = new Object[BTree.size(btree)]; + BTree.toArray(btree, data, 0); + + int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd instanceof ComplexColumnData); + this.firstComplexIdx = idx < 0 ? data.length : idx; + this.complexIdx = firstComplexIdx; + } + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (simpleIdx >= firstComplexIdx) + { + if (complexIdx >= data.length) + return endOfData(); + + complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); + } + else + { + if (complexIdx >= data.length) + return (Cell)data[simpleIdx++]; + + if (comparator.compare(((ColumnData) data[simpleIdx]).column().name.bytes, ((ColumnData) data[complexIdx]).column().name.bytes) < 0) + return (Cell)data[simpleIdx++]; + else + complexCells = ((ComplexColumnData)data[complexIdx++]).iterator(); + } + } + } + } + + public static class Builder implements Row.Builder + { + // a simple marker class that will sort to the beginning of a run of complex cells to store the deletion time + private static class ComplexColumnDeletion extends BufferCell + { + public ComplexColumnDeletion(ColumnDefinition column, DeletionTime deletionTime) + { + super(column, deletionTime.markedForDeleteAt(), 0, deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, CellPath.BOTTOM); + } + } + + // converts a run of Cell with equal column into a ColumnData + private static class CellResolver implements BTree.Builder.Resolver + { + final int nowInSec; + private CellResolver(int nowInSec) + { + this.nowInSec = nowInSec; + } + + public ColumnData resolve(Object[] cells, int lb, int ub) + { + Cell cell = (Cell) cells[lb]; + ColumnDefinition column = cell.column; + if (cell.column.isSimple()) + { + assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE; + while (++lb < ub) + cell = Cells.reconcile(cell, (Cell) cells[lb], nowInSec); + return cell; + } + + // TODO: relax this in the case our outer provider is sorted (want to delay until remaining changes are + // bedded in, as less important; galloping makes it pretty cheap anyway) + Arrays.sort(cells, lb, ub, (Comparator<Object>) column.cellComparator()); + cell = (Cell) cells[lb]; + DeletionTime deletion = DeletionTime.LIVE; + if (cell instanceof ComplexColumnDeletion) + { + // TODO: do we need to be robust to multiple of these being provided? + deletion = new DeletionTime(cell.timestamp(), cell.localDeletionTime()); + lb++; + } + + List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub); + Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp()); + return new ComplexColumnData(column, btree, deletion); + } + + }; + protected final Columns columns; + + protected Clustering clustering; + protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY; + protected DeletionTime deletion = DeletionTime.LIVE; + + private final boolean isSorted; + private final BTree.Builder<Cell> cells; + private final CellResolver resolver; + private boolean hasComplex = false; + + // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion. + + protected Builder(Columns columns, boolean isSorted) + { + this(columns, isSorted, Integer.MIN_VALUE); + } + + protected Builder(Columns columns, boolean isSorted, int nowInSecs) + { + this.columns = columns; + this.cells = BTree.builder(ColumnData.comparator); + resolver = new CellResolver(nowInSecs); + this.isSorted = isSorted; + this.cells.auto(false); + } + + public boolean isSorted() + { + return isSorted; + } + + public void newRow(Clustering clustering) + { + assert this.clustering == null; // Ensures we've properly called build() if we've use this builder before + this.clustering = clustering; + } + + public Clustering clustering() + { + return clustering; + } + + protected void reset() + { + this.clustering = null; + this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; + this.deletion = DeletionTime.LIVE; + this.cells.reuse(); + } + + public void addPrimaryKeyLivenessInfo(LivenessInfo info) + { + this.primaryKeyLivenessInfo = info; + } + + public void addRowDeletion(DeletionTime deletion) + { + this.deletion = deletion; + } + + public void addCell(Cell cell) + { + assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; + cells.add(cell); + hasComplex |= cell.column.isComplex(); + } + + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + cells.add(new ComplexColumnDeletion(column, complexDeletion)); + hasComplex = true; + } + + public Row build() + { + if (!isSorted) + cells.sort(); + // we can avoid resolving if we're sorted and have no complex values + // (because we'll only have unique simple cells, which are already in their final condition) + if (!isSorted | hasComplex) + cells.resolve(resolver); + Object[] btree = cells.build(); + int minDeletionTime = minDeletionTime(btree, primaryKeyLivenessInfo, deletion); + Row row = new BTreeRow(clustering, columns, primaryKeyLivenessInfo, deletion, btree, minDeletionTime); + reset(); + return row; + } + + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/EncodingStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java b/src/java/org/apache/cassandra/db/rows/EncodingStats.java index ca62c47..efa40ad 100644 --- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java +++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java @@ -240,6 +240,17 @@ public class EncodingStats isTTLSet ? minTTL : TTL_EPOCH, isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1); } + + public static EncodingStats collect(Row staticRow, Iterator<Row> rows, DeletionInfo deletionInfo) + { + Collector collector = new Collector(); + deletionInfo.collectStats(collector); + if (!staticRow.isEmpty()) + Rows.collectStats(staticRow, collector); + while (rows.hasNext()) + Rows.collectStats(rows.next(), collector); + return collector.get(); + } } public static class Serializer http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 33ad447..996e89a 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -395,7 +395,7 @@ public interface Row extends Unfiltered, Iterable<ColumnData> // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty() ? null - : BTreeBackedRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp())); + : BTreeRow.create(clustering, columns, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.<ColumnData>noOp())); } public Clustering mergedClustering() http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index 766cf19..30f5c50 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -49,7 +49,12 @@ public abstract class RowIterators iterator.next().digest(digest); } - public static RowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) + public static RowIterator emptyIterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder) + { + return iterator(cfm, partitionKey, isReverseOrder, Collections.emptyIterator()); + } + + public static RowIterator iterator(CFMetaData cfm, DecoratedKey partitionKey, boolean isReverseOrder, Iterator<Row> iterator) { return new RowIterator() { @@ -78,23 +83,16 @@ public abstract class RowIterators return Rows.EMPTY_STATIC_ROW; } + public void close() { } + public boolean hasNext() { - return false; + return iterator.hasNext(); } public Row next() { - throw new NoSuchElementException(); - } - - public void remove() - { - throw new UnsupportedOperationException(); - } - - public void close() - { + return iterator.next(); } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index bacd591..0b739a8 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -49,7 +49,7 @@ public abstract class Rows private Rows() {} - public static final Row EMPTY_STATIC_ROW = BTreeBackedRow.emptyRow(Clustering.STATIC_CLUSTERING); + public static final Row EMPTY_STATIC_ROW = BTreeRow.emptyRow(Clustering.STATIC_CLUSTERING); public static Row.Builder copy(Row row, Row.Builder builder) { @@ -217,7 +217,7 @@ public abstract class Rows public static Row merge(Row row1, Row row2, int nowInSec) { Columns mergedColumns = row1.columns().mergeTo(row2.columns()); - Row.Builder builder = BTreeBackedRow.sortedBuilder(mergedColumns); + Row.Builder builder = BTreeRow.sortedBuilder(mergedColumns); merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater); return builder.build(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index f17ccca..2102534 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -204,7 +204,7 @@ public class UnfilteredRowIteratorSerializer final SerializationHeader sHeader = header.sHeader; return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats()) { - private final Row.Builder builder = BTreeBackedRow.sortedBuilder(sHeader.columns().regulars); + private final Row.Builder builder = BTreeRow.sortedBuilder(sHeader.columns().regulars); protected Unfiltered computeNext() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 650a18d..60f0dcb 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -89,10 +89,14 @@ public abstract class UnfilteredRowIterators return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener); } + public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) + { + return noRowsIterator(cfm, partitionKey, Rows.EMPTY_STATIC_ROW, DeletionTime.LIVE, isReverseOrder); + } /** * Returns an empty atom iterator for a given partition. */ - public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final boolean isReverseOrder) + public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder) { return new UnfilteredRowIterator() { @@ -118,12 +122,12 @@ public abstract class UnfilteredRowIterators public DeletionTime partitionLevelDeletion() { - return DeletionTime.LIVE; + return partitionDeletion; } public Row staticRow() { - return Rows.EMPTY_STATIC_ROW; + return staticRow; } public EncodingStats stats() @@ -225,7 +229,7 @@ public abstract class UnfilteredRowIterators @Override protected Row computeNextStatic(Row row) { - Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics); + Row.Builder staticBuilder = allocator.cloningBTreeRowBuilder(columns().statics); return Rows.copy(row, staticBuilder).build(); } @@ -233,7 +237,7 @@ public abstract class UnfilteredRowIterators protected Row computeNext(Row row) { if (regularBuilder == null) - regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars); + regularBuilder = allocator.cloningBTreeRowBuilder(columns().regulars); return Rows.copy(row, regularBuilder).build(); } @@ -541,7 +545,7 @@ public abstract class UnfilteredRowIterators { Row merged = rowMerger.merge(markerMerger.activeDeletion()); if (listener != null) - listener.onMergedRows(merged == null ? BTreeBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows()); + listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows()); return merged; } else http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index f306e6d..14b06cf 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -341,7 +341,7 @@ public class UnfilteredSerializer { int flags = in.readUnsignedByte(); assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags; - Row.Builder builder = BTreeBackedRow.sortedBuilder(helper.fetchedStaticColumns(header)); + Row.Builder builder = BTreeRow.sortedBuilder(helper.fetchedStaticColumns(header)); builder.newRow(Clustering.STATIC_CLUSTERING); return deserializeRowBody(in, header, helper, flags, builder); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java index 39b2769..7337e4b 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedView.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java @@ -52,10 +52,10 @@ import org.apache.cassandra.db.ReadOrderGroup; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; +import org.apache.cassandra.db.partitions.AbstractBTreePartition; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.BTreeBackedRow; +import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.ColumnData; import org.apache.cassandra.db.rows.ComplexColumnData; @@ -208,7 +208,7 @@ public class MaterializedView * * @return true if {@param partition} modifies a column included in the view */ - public boolean updateAffectsView(AbstractThreadUnsafePartition partition) + public boolean updateAffectsView(AbstractBTreePartition partition) { // If we are including all of the columns, then any update will be included if (includeAll) @@ -268,7 +268,7 @@ public class MaterializedView int nowInSec) { CFMetaData viewCfm = getViewCfs().metadata; - Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); + Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); builder.newRow(viewClustering(temporalRow, resolver)); builder.addRowDeletion(deletionTime); return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); @@ -286,7 +286,7 @@ public class MaterializedView { CFMetaData viewCfm = getViewCfs().metadata; - Row.Builder builder = BTreeBackedRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); + Row.Builder builder = BTreeRow.unsortedBuilder(viewCfm.partitionColumns().regulars, nowInSec); builder.newRow(viewClustering(temporalRow, resolver)); builder.addComplexDeletion(deletedColumn, deletionTime); return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, builder.build()); @@ -363,7 +363,7 @@ public class MaterializedView return null; } - Row.Builder regularBuilder = BTreeBackedRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec); + Row.Builder regularBuilder = BTreeRow.unsortedBuilder(viewCfs.metadata.partitionColumns().regulars, temporalRow.nowInSec); CBuilder clustering = CBuilder.create(viewCfs.getComparator()); for (int i = 0; i < viewCfs.metadata.clusteringColumns().size(); i++) @@ -395,7 +395,7 @@ public class MaterializedView * @return View Tombstones which delete all of the rows which have been removed from the base table with * {@param partition} */ - private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractThreadUnsafePartition partition) + private Collection<Mutation> createForDeletionInfo(TemporalRow.Set rowSet, AbstractBTreePartition partition) { final TemporalRow.Resolver resolver = TemporalRow.earliest; @@ -558,7 +558,7 @@ public class MaterializedView /** * @return Set of rows which are contained in the partition update {@param partition} */ - private TemporalRow.Set separateRows(ByteBuffer key, AbstractThreadUnsafePartition partition) + private TemporalRow.Set separateRows(ByteBuffer key, AbstractBTreePartition partition) { Set<ColumnIdentifier> columns = new HashSet<>(); for (ColumnDefinition def : this.columns.primaryKeyDefs) @@ -578,7 +578,7 @@ public class MaterializedView * have been applied successfully. This is based solely on the changes that are necessary given the current * state of the base table and the newly applying partition data. */ - public Collection<Mutation> createMutations(ByteBuffer key, AbstractThreadUnsafePartition partition, boolean isBuilding) + public Collection<Mutation> createMutations(ByteBuffer key, AbstractBTreePartition partition, boolean isBuilding) { if (!updateAffectsView(partition)) return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java index d0ba5ea..00fdf48 100644 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -45,7 +45,7 @@ import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.RangeTombstone; import org.apache.cassandra.db.Slice; import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; +import org.apache.cassandra.db.partitions.AbstractBTreePartition; import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.CellPath; @@ -309,7 +309,7 @@ public class TemporalRow return null; } - public DeletionTime deletionTime(AbstractThreadUnsafePartition partition) + public DeletionTime deletionTime(AbstractBTreePartition partition) { DeletionInfo deletionInfo = partition.deletionInfo(); if (!deletionInfo.getPartitionDeletion().isLive()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java index 65a5259..7382e5e 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java @@ -71,7 +71,7 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> { super(metadata, in, helper); this.header = header; - this.builder = BTreeBackedRow.sortedBuilder(helper.fetchedRegularColumns(header)); + this.builder = BTreeRow.sortedBuilder(helper.fetchedRegularColumns(header)); } public Row readStaticRow() throws IOException http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index c9d9fa5..9213b20 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.UUID; @@ -35,7 +34,6 @@ import javax.management.ObjectName; import com.google.common.util.concurrent.Futures; import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +48,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.partitions.ArrayBackedCachedPartition; +import org.apache.cassandra.db.partitions.CachedBTreePartition; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.io.util.DataInputPlus; @@ -431,7 +429,7 @@ public class CacheService implements CacheServiceMBean int nowInSec = FBUtilities.nowInSeconds(); try (OpOrder.Group op = cfs.readOrdering.start(); UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(cfs.metadata, nowInSec, key).queryMemtableAndDisk(cfs, op)) { - CachedPartition toCache = ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec), nowInSec); return Pair.create(new RowCacheKey(cfs.metadata.cfId, key), (IRowCacheEntry)toCache); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 6bfe94a..3dc323e 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -214,7 +214,7 @@ public class DataResolver extends ResponseResolver { if (currentRows[i] == null) { - currentRows[i] = BTreeBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars); + currentRows[i] = BTreeRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars); currentRows[i].newRow(clustering); } return currentRows[i]; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index 733067e..cb0667a 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -824,7 +824,7 @@ public class CassandraServer implements Cassandra.Iface { LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_parent.super_column, column.name); Cell cell = cellFromColumn(metadata, name, column); - PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell)); + PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell)); mutation = new org.apache.cassandra.db.Mutation(update); } @@ -1329,7 +1329,7 @@ public class CassandraServer implements Cassandra.Iface } else if (column_path.super_column != null && column_path.column == null) { - Row row = BTreeBackedRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec)); + Row row = BTreeRow.emptyDeletedRow(new Clustering(column_path.super_column), new DeletionTime(timestamp, nowInSec)); update = PartitionUpdate.singleRowUpdate(metadata, dk, row); } else @@ -1339,7 +1339,7 @@ public class CassandraServer implements Cassandra.Iface LegacyLayout.LegacyCellName name = LegacyLayout.decodeCellName(metadata, column_path.super_column, column_path.column); CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); Cell cell = BufferCell.tombstone(name.column, timestamp, nowInSec, path); - update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeBackedRow.singleCellRow(name.clustering, cell)); + update = PartitionUpdate.singleRowUpdate(metadata, dk, BTreeRow.singleCellRow(name.clustering, cell)); } catch (UnknownColumnException e) { @@ -2138,7 +2138,7 @@ public class CassandraServer implements Cassandra.Iface CellPath path = name.collectionElement == null ? null : CellPath.create(name.collectionElement); Cell cell = BufferCell.live(metadata, name.column, FBUtilities.timestampMicros(), value, path); - PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeBackedRow.singleCellRow(name.clustering, cell)); + PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell)); org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update); doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level)))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java index 09e8d4b..084e835 100644 --- a/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java +++ b/src/java/org/apache/cassandra/thrift/ThriftResultsMerger.java @@ -112,7 +112,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator super(results); assert results.metadata().isStaticCompactTable(); this.nowInSec = nowInSec; - this.builder = BTreeBackedRow.sortedBuilder(results.columns().regulars); + this.builder = BTreeRow.sortedBuilder(results.columns().regulars); } private void init() @@ -220,7 +220,7 @@ public class ThriftResultsMerger extends WrappingUnfilteredPartitionIterator this.superColumnMapColumn = results.metadata().compactValueColumn(); assert superColumnMapColumn != null && superColumnMapColumn.type instanceof MapType; - this.builder = BTreeBackedRow.sortedBuilder(Columns.of(superColumnMapColumn)); + this.builder = BTreeRow.sortedBuilder(Columns.of(superColumnMapColumn)); this.columnComparator = ((MapType)superColumnMapColumn.type).nameComparator(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/triggers/TriggerExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java index d75d2f1..40d4094 100644 --- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java +++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java @@ -156,7 +156,7 @@ public class TriggerExecutor return merged; } - private Collection<PartitionUpdate> validateForSinglePartition(UUID cfId, + private List<PartitionUpdate> validateForSinglePartition(UUID cfId, DecoratedKey key, Collection<Mutation> tmutations) throws InvalidRequestException @@ -165,7 +165,7 @@ public class TriggerExecutor if (tmutations.size() == 1) { - Collection<PartitionUpdate> updates = Iterables.getOnlyElement(tmutations).getPartitionUpdates(); + List<PartitionUpdate> updates = Lists.newArrayList(Iterables.getOnlyElement(tmutations).getPartitionUpdates()); if (updates.size() > 1) throw new InvalidRequestException("The updates generated by triggers are not all for the same partition"); validateSamePartition(cfId, key, Iterables.getOnlyElement(updates)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/BTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 353e7a5..fe08011 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -183,7 +183,7 @@ public class BTree return btree; } - public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<K> comparator) + public static <K> Object[] merge(Object[] tree1, Object[] tree2, Comparator<? super K> comparator, UpdateFunction<K, K> updateF) { if (size(tree1) < size(tree2)) { @@ -191,7 +191,7 @@ public class BTree tree1 = tree2; tree2 = tmp; } - return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), UpdateFunction.<K>noOp()); + return update(tree1, comparator, new BTreeSet<K>(tree2, comparator), updateF); } public static <V> Iterator<V> iterator(Object[] btree) @@ -749,8 +749,15 @@ public class BTree return new Builder<>(comparator); } + public static <V> Builder<V> builder(Comparator<? super V> comparator, int initialCapacity) + { + return new Builder<>(comparator); + } + public static class Builder<V> { + + // a user-defined bulk resolution, to be applied manually via resolve() public static interface Resolver { // can return a different output type to input, so long as sort order is maintained @@ -759,15 +766,37 @@ public class BTree Object resolve(Object[] array, int lb, int ub); } + // a user-defined resolver that is applied automatically on encountering two duplicate values + public static interface QuickResolver<V> + { + // can return a different output type to input, so long as sort order is maintained + // if a resolver is present, this method will be called for every sequence of equal inputs + // even those with only one item + V resolve(V a, V b); + } + Comparator<? super V> comparator; - Object[] values = new Object[10]; + Object[] values; int count; - boolean detected; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added + boolean detected = true; // true if we have managed to cheaply ensure sorted (+ filtered, if resolver == null) as we have added boolean auto = true; // false if the user has promised to enforce the sort order and resolve any duplicates + QuickResolver<V> quickResolver; protected Builder(Comparator<? super V> comparator) { + this(comparator, 16); + } + + protected Builder(Comparator<? super V> comparator, int initialCapacity) + { this.comparator = comparator; + this.values = new Object[initialCapacity]; + } + + public Builder<V> setQuickResolver(QuickResolver<V> quickResolver) + { + this.quickResolver = quickResolver; + return this; } public void reuse() @@ -792,14 +821,20 @@ public class BTree { if (count == values.length) values = Arrays.copyOf(values, count * 2); - values[count++] = v; - if (auto && detected && count > 1) + Object[] values = this.values; + int prevCount = this.count++; + values[prevCount] = v; + + if (auto && detected && prevCount > 0) { - int c = comparator.compare((V) values[count - 2], (V) values[count - 1]); + V prev = (V) values[prevCount - 1]; + int c = comparator.compare(prev, v); if (c == 0 && auto) { - count--; + count = prevCount; + if (quickResolver != null) + values[prevCount - 1] = quickResolver.resolve(prev, v); } else if (c > 0) { @@ -881,7 +916,11 @@ public class BTree if (c > 0) break; else if (c == 0) + { + if (quickResolver != null) + a[i] = quickResolver.resolve(ai, aj); j++; + } i++; } @@ -896,11 +935,14 @@ public class BTree while (i < curEnd && j < addEnd) { + V ai = (V) a[i]; + V aj = (V) a[j]; // could avoid one comparison if we cared, but would make this ugly - int c = comparator.compare((V) a[i], (V) a[j]); + int c = comparator.compare(ai, aj); if (c == 0) { - a[newCount++] = a[i]; + Object newValue = quickResolver == null ? ai : quickResolver.resolve(ai, aj); + a[newCount++] = newValue; i++; j++; } @@ -931,6 +973,19 @@ public class BTree return count == 0; } + public Builder<V> reverse() + { + assert !auto; + int mid = count / 2; + for (int i = 0 ; i < mid ; i++) + { + Object t = values[i]; + values[i] = values[count - (1 + i)]; + values[count - (1 + i)] = t; + } + return this; + } + public Builder<V> sort() { Arrays.sort((V[]) values, 0, count, comparator); @@ -943,11 +998,17 @@ public class BTree if (!detected && count > 1) { sort(); - int c = 1; + int prevIdx = 0; + V prev = (V) values[0]; for (int i = 1 ; i < count ; i++) - if (comparator.compare((V) values[i], (V) values[i - 1]) != 0) - values[c++] = values[i]; - count = c; + { + V next = (V) values[i]; + if (comparator.compare(prev, next) != 0) + values[++prevIdx] = prev = next; + else if (quickResolver != null) + values[prevIdx] = prev = quickResolver.resolve(prev, next); + } + count = prevIdx + 1; } detected = true; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java index 9a7fa1b..0ab10c2 100644 --- a/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java +++ b/src/java/org/apache/cassandra/utils/btree/UpdateFunction.java @@ -18,6 +18,8 @@ */ package org.apache.cassandra.utils.btree; +import java.util.function.BiFunction; + import com.google.common.base.Function; /** * An interface defining a function to be applied to both the object we are replacing in a BTree and @@ -42,27 +44,26 @@ public interface UpdateFunction<K, V> extends Function<K, V> */ void allocated(long heapSize); - static final UpdateFunction<Object, Object> noOp = new UpdateFunction<Object, Object>() + public static final class Simple<V> implements UpdateFunction<V, V> { - public Object apply(Object replacing, Object updating) + private final BiFunction<V, V, V> wrapped; + public Simple(BiFunction<V, V, V> wrapped) { - return updating; + this.wrapped = wrapped; } - public boolean abortEarly() - { - return false; - } + public V apply(V v) { return v; } + public V apply(V replacing, V update) { return wrapped.apply(replacing, update); } + public boolean abortEarly() { return false; } + public void allocated(long heapSize) { } - public void allocated(long heapSize) + public static <V> Simple<V> of(BiFunction<V, V, V> f) { + return new Simple<>(f); } + } - public Object apply(Object k) - { - return k; - } - }; + static final Simple<Object> noOp = Simple.of((a, b) -> a); public static <K> UpdateFunction<K, K> noOp() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java index 8fd470f..a76732f 100644 --- a/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/AbstractAllocator.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.Clustering; import org.apache.cassandra.db.Columns; -import org.apache.cassandra.db.rows.BTreeBackedRow; +import org.apache.cassandra.db.rows.BTreeRow; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.utils.ByteBufferUtil; @@ -46,16 +46,16 @@ public abstract class AbstractAllocator public abstract ByteBuffer allocate(int size); - public Row.Builder cloningArrayBackedRowBuilder(Columns columns) + public Row.Builder cloningBTreeRowBuilder(Columns columns) { - return new CloningArrayBackedRowBuilder(columns, this); + return new CloningBTreeRowBuilder(columns, this); } - private static class CloningArrayBackedRowBuilder extends BTreeBackedRow.Builder + private static class CloningBTreeRowBuilder extends BTreeRow.Builder { private final AbstractAllocator allocator; - private CloningArrayBackedRowBuilder(Columns columns, AbstractAllocator allocator) + private CloningBTreeRowBuilder(Columns columns, AbstractAllocator allocator) { super(columns, true); this.allocator = allocator; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java index 31df444..8205f3b 100644 --- a/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtableBufferAllocator.java @@ -20,7 +20,6 @@ package org.apache.cassandra.utils.memory; import java.nio.ByteBuffer; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -35,7 +34,7 @@ public abstract class MemtableBufferAllocator extends MemtableAllocator public Row.Builder rowBuilder(CFMetaData metadata, OpOrder.Group writeOp, boolean isStatic) { Columns columns = isStatic ? metadata.partitionColumns().statics : metadata.partitionColumns().regulars; - return allocator(writeOp).cloningArrayBackedRowBuilder(columns); + return allocator(writeOp).cloningBTreeRowBuilder(columns); } public DecoratedKey clone(DecoratedKey key, OpOrder.Group writeOp) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/burn/org/apache/cassandra/utils/LongBTreeTest.java ---------------------------------------------------------------------- diff --git a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java index 37866d2..0e8c467 100644 --- a/test/burn/org/apache/cassandra/utils/LongBTreeTest.java +++ b/test/burn/org/apache/cassandra/utils/LongBTreeTest.java @@ -18,6 +18,9 @@ */ package org.apache.cassandra.utils; +import java.lang.annotation.Annotation; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -54,15 +57,16 @@ import static org.junit.Assert.assertTrue; public class LongBTreeTest { + private static final boolean DEBUG = false; private static int perThreadTrees = 10000; - private static int minTreeSize = 5; - private static int maxTreeSize = 15; - private static final boolean DEBUG = true; + private static int minTreeSize = 4; + private static int maxTreeSize = 10000; + private static int threads = DEBUG ? 1 : Runtime.getRuntime().availableProcessors() * 8; private static final MetricRegistry metrics = new MetricRegistry(); private static final Timer BTREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "BTREE")); private static final Timer TREE_TIMER = metrics.timer(MetricRegistry.name(BTree.class, "TREE")); - private static final ExecutorService MODIFY = Executors.newFixedThreadPool(DEBUG ? 1 : Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("MODIFY")); - private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), new NamedThreadFactory("COMPARE")); + private static final ExecutorService MODIFY = Executors.newFixedThreadPool(threads, new NamedThreadFactory("MODIFY")); + private static final ExecutorService COMPARE = DEBUG ? MODIFY : Executors.newFixedThreadPool(threads, new NamedThreadFactory("COMPARE")); private static final RandomAbort<Integer> SPORADIC_ABORT = new RandomAbort<>(new Random(), 0.0001f); static @@ -316,7 +320,7 @@ public class LongBTreeTest latch.await(1L, TimeUnit.SECONDS); Assert.assertEquals(0, errors.get()); } - System.out.println(String.format("%.0f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : "")); + log("%.1f%% complete %s", 100 * count.get() / (double) totalCount, errors.get() > 0 ? ("Errors: " + errors.get()) : ""); } } @@ -466,8 +470,10 @@ public class LongBTreeTest private static RandomTree randomTree(int minSize, int maxSize) { - return ThreadLocalRandom.current().nextBoolean() ? randomTreeByUpdate(minSize, maxSize) - : randomTreeByBuilder(minSize, maxSize); + // perform most of our tree constructions via update, as this is more efficient; since every run uses this + // we test builder disproportionately more often than if it had its own test anyway + return ThreadLocalRandom.current().nextFloat() < 0.95 ? randomTreeByUpdate(minSize, maxSize) + : randomTreeByBuilder(minSize, maxSize); } private static RandomTree randomTreeByUpdate(int minSize, int maxSize) @@ -600,7 +606,7 @@ public class LongBTreeTest TreeSet<Integer> canon = new TreeSet<>(); for (int i = 0 ; i < 10000000 ; i++) canon.add(i); - Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), null); + Object[] btree = BTree.build(Arrays.asList(Integer.MIN_VALUE, Integer.MAX_VALUE), UpdateFunction.noOp()); btree = BTree.update(btree, naturalOrder(), canon, UpdateFunction.<Integer>noOp()); canon.add(Integer.MIN_VALUE); canon.add(Integer.MAX_VALUE); @@ -611,47 +617,61 @@ public class LongBTreeTest @Test public void testIndividualInsertsSmallOverlappingRange() throws ExecutionException, InterruptedException { - testInsertions(10000000, 50, 1, 1, true); + testInsertions(50, 1, 1, true); } @Test public void testBatchesSmallOverlappingRange() throws ExecutionException, InterruptedException { - testInsertions(10000000, 50, 1, 5, true); + testInsertions(50, 1, 5, true); } @Test public void testIndividualInsertsMediumSparseRange() throws ExecutionException, InterruptedException { - testInsertions(10000000, 500, 10, 1, true); + testInsertions(perThreadTrees / 10, 500, 10, 1, true); } @Test public void testBatchesMediumSparseRange() throws ExecutionException, InterruptedException { - testInsertions(10000000, 500, 10, 10, true); + testInsertions(500, 10, 10, true); } @Test public void testLargeBatchesLargeRange() throws ExecutionException, InterruptedException { - testInsertions(100000000, 5000, 3, 100, true); + testInsertions(perThreadTrees / 10, Math.max(maxTreeSize, 5000), 3, 100, true); + } + + @Test + public void testRandomRangeAndBatches() throws ExecutionException, InterruptedException + { + ThreadLocalRandom random = ThreadLocalRandom.current(); + int treeSize = random.nextInt(maxTreeSize / 10, maxTreeSize * 10); + for (int i = 0 ; i < perThreadTrees / 10 ; i++) + testInsertions(threads * 10, treeSize, random.nextInt(1, 100) / 10f, treeSize / 100, true); } @Test public void testSlicingSmallRandomTrees() throws ExecutionException, InterruptedException { - testInsertions(10000, 50, 10, 10, false); + testInsertions(50, 10, 10, false); } - private static void testInsertions(int totalCount, int perTestCount, int testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException + private static void testInsertions(int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException + { + int tests = perThreadTrees * threads; + testInsertions(tests, perTestCount, testKeyRatio, modificationBatchSize, quickEquality); + } + + private static void testInsertions(int tests, int perTestCount, float testKeyRatio, int modificationBatchSize, boolean quickEquality) throws ExecutionException, InterruptedException { int batchesPerTest = perTestCount / modificationBatchSize; - int maximumRunLength = 100; - int testKeyRange = perTestCount * testKeyRatio; - int tests = totalCount / perTestCount; - System.out.println(String.format("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops", - tests, perTestCount, 1 / (float) testKeyRatio, modificationBatchSize)); + int testKeyRange = (int) (perTestCount * testKeyRatio); + long totalCount = (long) perTestCount * tests; + log("Performing %d tests of %d operations, with %.2f max size/key-range ratio in batches of ~%d ops", + tests, perTestCount, 1 / testKeyRatio, modificationBatchSize); // if we're not doing quick-equality, we can spam with garbage for all the checks we perform, so we'll split the work into smaller chunks int chunkSize = quickEquality ? tests : (int) (100000 / Math.pow(perTestCount, 2)); @@ -660,30 +680,33 @@ public class LongBTreeTest final List<ListenableFutureTask<List<ListenableFuture<?>>>> outer = new ArrayList<>(); for (int i = 0 ; i < chunkSize ; i++) { - outer.add(doOneTestInsertions(testKeyRange, maximumRunLength, modificationBatchSize, batchesPerTest, quickEquality)); + int maxRunLength = modificationBatchSize == 1 ? 1 : ThreadLocalRandom.current().nextInt(1, modificationBatchSize); + outer.add(doOneTestInsertions(testKeyRange, maxRunLength, modificationBatchSize, batchesPerTest, quickEquality)); } final List<ListenableFuture<?>> inner = new ArrayList<>(); - int complete = 0; - int reportInterval = totalCount / 100; - int lastReportAt = 0; + long complete = 0; + int reportInterval = Math.max(1000, (int) (totalCount / 10000)); + long lastReportAt = 0; for (ListenableFutureTask<List<ListenableFuture<?>>> f : outer) { inner.addAll(f.get()); complete += perTestCount; if (complete - lastReportAt >= reportInterval) { - System.out.println(String.format("Completed %d of %d operations", (chunk * perTestCount) + complete, totalCount)); + long done = (chunk * perTestCount) + complete; + float ratio = done / (float) totalCount; + log("Completed %.1f%% (%d of %d operations)", ratio * 100, done, totalCount); lastReportAt = complete; } } Futures.allAsList(inner).get(); } Snapshot snap = BTREE_TIMER.getSnapshot(); - System.out.println(String.format("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); + log("btree: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()); snap = TREE_TIMER.getSnapshot(); - System.out.println(String.format("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile())); - System.out.println("Done"); + log("java: %.2fns, %.2fns, %.2fns", snap.getMedian(), snap.get95thPercentile(), snap.get999thPercentile()); + log("Done"); } private static ListenableFutureTask<List<ListenableFuture<?>>> doOneTestInsertions(final int upperBound, final int maxRunLength, final int averageModsPerIteration, final int iterations, final boolean quickEquality) @@ -697,11 +720,11 @@ public class LongBTreeTest NavigableMap<Integer, Integer> canon = new TreeMap<>(); Object[] btree = BTree.empty(); final TreeMap<Integer, Integer> buffer = new TreeMap<>(); - final Random rnd = new Random(); + ThreadLocalRandom rnd = ThreadLocalRandom.current(); for (int i = 0 ; i < iterations ; i++) { buffer.clear(); - int mods = (averageModsPerIteration >> 1) + 1 + rnd.nextInt(averageModsPerIteration); + int mods = rnd.nextInt(1, averageModsPerIteration * 2); while (mods > 0) { int v = rnd.nextInt(upperBound); @@ -727,7 +750,7 @@ public class LongBTreeTest if (!BTree.isWellFormed(btree, naturalOrder())) { - System.out.println("ERROR: Not well formed"); + log("ERROR: Not well formed"); throw new AssertionError("Not well formed!"); } if (quickEquality) @@ -754,7 +777,7 @@ public class LongBTreeTest for (int i = 0 ; i < 128 ; i++) { String id = String.format("[0..%d)", canon.size()); - System.out.println("Testing " + id); + log("Testing " + id); Futures.allAsList(testAllSlices(id, cur, canon)).get(); Object[] next = null; while (next == null) @@ -819,7 +842,7 @@ public class LongBTreeTest { if (test != expect) { - System.out.println(String.format("%s: Expected %d, Got %d", id, expect, test)); + log("%s: Expected %d, Got %d", id, expect, test); } } @@ -832,18 +855,18 @@ public class LongBTreeTest Object j = canon.next(); if (!Objects.equals(i, j)) { - System.out.println(String.format("%s: Expected %d, Got %d", id, j, i)); + log("%s: Expected %d, Got %d", id, j, i); equal = false; } } while (btree.hasNext()) { - System.out.println(String.format("%s: Expected <Nil>, Got %d", id, btree.next())); + log("%s: Expected <Nil>, Got %d", id, btree.next()); equal = false; } while (canon.hasNext()) { - System.out.println(String.format("%s: Expected %d, Got Nil", id, canon.next())); + log("%s: Expected %d, Got Nil", id, canon.next()); equal = false; } if (!equal) @@ -930,4 +953,58 @@ public class LongBTreeTest return v; } } + + public static void main(String[] args) throws ExecutionException, InterruptedException, InvocationTargetException, IllegalAccessException + { + for (String arg : args) + { + if (arg.startsWith("fan=")) + System.setProperty("cassandra.btree.fanfactor", arg.substring(4)); + else if (arg.startsWith("min=")) + minTreeSize = Integer.parseInt(arg.substring(4)); + else if (arg.startsWith("max=")) + maxTreeSize = Integer.parseInt(arg.substring(4)); + else if (arg.startsWith("count=")) + perThreadTrees = Integer.parseInt(arg.substring(6)); + else + exit(); + } + + List<Method> methods = new ArrayList<>(); + for (Method m : LongBTreeTest.class.getDeclaredMethods()) + { + if (m.getParameters().length > 0) + continue; + for (Annotation annotation : m.getAnnotations()) + if (annotation.annotationType() == Test.class) + methods.add(m); + } + + LongBTreeTest test = new LongBTreeTest(); + Collections.sort(methods, (a, b) -> a.getName().compareTo(b.getName())); + log(Lists.transform(methods, (m) -> m.getName()).toString()); + for (Method m : methods) + { + log(m.getName()); + m.invoke(test); + } + log("success"); + } + + private static void exit() + { + log("usage: fan=<int> min=<int> max=<int> count=<int>"); + log("fan: btree fanout"); + log("min: minimum btree size (must be >= 4)"); + log("max: maximum btree size (must be >= 4)"); + log("count: number of trees to assign each core, for each test"); + } + + private static void log(String formatstr, Object ... args) + { + args = Arrays.copyOf(args, args.length + 1); + System.arraycopy(args, 0, args, 1, args.length - 1); + args[0] = System.currentTimeMillis(); + System.out.printf("%tT: " + formatstr + "\n", args); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index 358168f..e8451e0 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -297,16 +297,16 @@ public class Util } } - public static List<ArrayBackedPartition> getAllUnfiltered(ReadCommand command) + public static List<ImmutableBTreePartition> getAllUnfiltered(ReadCommand command) { - List<ArrayBackedPartition> results = new ArrayList<>(); + List<ImmutableBTreePartition> results = new ArrayList<>(); try (ReadOrderGroup orderGroup = command.startOrderGroup(); UnfilteredPartitionIterator iterator = command.executeLocally(orderGroup)) { while (iterator.hasNext()) { try (UnfilteredRowIterator partition = iterator.next()) { - results.add(ArrayBackedPartition.create(partition)); + results.add(ImmutableBTreePartition.create(partition)); } } } @@ -362,7 +362,7 @@ public class Util } } - public static ArrayBackedPartition getOnlyPartitionUnfiltered(ReadCommand cmd) + public static ImmutableBTreePartition getOnlyPartitionUnfiltered(ReadCommand cmd) { try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iterator = cmd.executeLocally(orderGroup)) { @@ -370,7 +370,7 @@ public class Util try (UnfilteredRowIterator partition = iterator.next()) { assert !iterator.hasNext() : "Expecting a single partition but got more"; - return ArrayBackedPartition.create(partition); + return ImmutableBTreePartition.create(partition); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 0448a16..5030029 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -66,16 +66,16 @@ public class CacheProviderTest cfm); } - private ArrayBackedCachedPartition createPartition() + private CachedBTreePartition createPartition() { PartitionUpdate update = new RowUpdateBuilder(cfm, System.currentTimeMillis(), "key1") .add("col1", "val1") .buildUpdate(); - return ArrayBackedCachedPartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds()); + return CachedBTreePartition.create(update.unfilteredIterator(), FBUtilities.nowInSeconds()); } - private void simpleCase(ArrayBackedCachedPartition partition, ICache<MeasureableString, IRowCacheEntry> cache) + private void simpleCase(CachedBTreePartition partition, ICache<MeasureableString, IRowCacheEntry> cache) { cache.put(key1, partition); assertNotNull(cache.get(key1)); @@ -89,15 +89,15 @@ public class CacheProviderTest assertEquals(CAPACITY, cache.size()); } - private void assertDigests(IRowCacheEntry one, ArrayBackedCachedPartition two) + private void assertDigests(IRowCacheEntry one, CachedBTreePartition two) { - assertTrue(one instanceof ArrayBackedCachedPartition); + assertTrue(one instanceof CachedBTreePartition); try { MessageDigest d1 = MessageDigest.getInstance("MD5"); MessageDigest d2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) one).unfilteredIterator(), d1); - UnfilteredRowIterators.digest(((ArrayBackedCachedPartition) two).unfilteredIterator(), d2); + UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1); + UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2); assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest())); } catch (NoSuchAlgorithmException e) @@ -106,7 +106,7 @@ public class CacheProviderTest } } - private void concurrentCase(final ArrayBackedCachedPartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException + private void concurrentCase(final CachedBTreePartition partition, final ICache<MeasureableString, IRowCacheEntry> cache) throws InterruptedException { final long startTime = System.currentTimeMillis() + 500; Runnable runnable = new Runnable() @@ -140,7 +140,7 @@ public class CacheProviderTest public void testSerializingCache() throws InterruptedException { ICache<MeasureableString, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<RefCountedMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer()); - ArrayBackedCachedPartition partition = createPartition(); + CachedBTreePartition partition = createPartition(); simpleCase(partition, cache); concurrentCase(partition, cache); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index 918e10d..721963d 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -754,7 +754,7 @@ public abstract class CQLTester if (!Objects.equal(expectedByteValue, actualValue)) { Object actualValueDecoded = actualValue == null ? null : column.type.getSerializer().deserialize(actualValue); - if (!expected[j].equals(actualValueDecoded)) + if (!Objects.equal(expected[j], actualValueDecoded)) Assert.fail(String.format("Invalid value for row %d column %d (%s of type %s), expected <%s> but got <%s>", i, j, http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index fbb7a5b..8c79689 100644 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@ -20,9 +20,19 @@ package org.apache.cassandra.db; import static org.apache.cassandra.utils.ByteBufferUtil.bytes; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.net.InetAddress; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.partitions.ImmutableBTreePartition; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import java.io.IOException; -import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ExecutionException; @@ -35,24 +45,17 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; import org.apache.cassandra.Util.PartitionerSwitcher; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.LongType; -import org.apache.cassandra.db.partitions.ArrayBackedPartition; -import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; public class BatchlogManagerTest @@ -108,7 +111,7 @@ public class BatchlogManagerTest .applyUnsafe(); DecoratedKey dk = cfs.decorateKey(ByteBufferUtil.bytes("1234")); - ArrayBackedPartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build()); + ImmutableBTreePartition results = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, dk).build()); Iterator<Row> iter = results.iterator(); assert iter.hasNext(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/test/unit/org/apache/cassandra/db/DeletePartitionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java index 6ab9a90..a65befd 100644 --- a/test/unit/org/apache/cassandra/db/DeletePartitionTest.java +++ b/test/unit/org/apache/cassandra/db/DeletePartitionTest.java @@ -86,7 +86,7 @@ public class DeletePartitionTest store.forceBlockingFlush(); // validate removal - ArrayBackedPartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build()); + ImmutableBTreePartition partitionUnfiltered = Util.getOnlyPartitionUnfiltered(Util.cmd(store, key).build()); assertFalse(partitionUnfiltered.partitionLevelDeletion().isLive()); assertFalse(partitionUnfiltered.iterator().hasNext()); }
