http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java new file mode 100644 index 0000000..016b4fa --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java @@ -0,0 +1,927 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.function.Predicate; + +import com.google.common.collect.AbstractIterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Immutable implementation of a Row object. + */ +public class ArrayBackedRow extends AbstractRow +{ + private static final ColumnData[] NO_DATA = new ColumnData[0]; + + private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayBackedRow(Clustering.EMPTY, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE)); + + private final Clustering clustering; + private final Columns columns; + private final LivenessInfo primaryKeyLivenessInfo; + private final DeletionTime deletion; + + // The data for each columns present in this row in column sorted order. + private final int size; + private final ColumnData[] data; + + // We need to filter the tombstones of a row on every read (twice in fact: first to remove purgeable tombstone, and then after reconciliation to remove + // all tombstone since we don't return them to the client) as well as on compaction. But it's likely that many rows won't have any tombstone at all, so + // we want to speed up that case by not having to iterate/copy the row in this case. We could keep a single boolean telling us if we have tombstones, + // but that doesn't work for expiring columns. So instead we keep the deletion time for the first thing in the row to be deleted. This allow at any given + // time to know if we have any deleted information or not. If we any "true" tombstone (i.e. not an expiring cell), this value will be forced to + // Integer.MIN_VALUE, but if we don't and have expiring cells, this will the time at which the first expiring cell expires. If we have no tombstones and + // no expiring cells, this will be Integer.MAX_VALUE; + private final int minLocalDeletionTime; + + private ArrayBackedRow(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data, int minLocalDeletionTime) + { + this.clustering = clustering; + this.columns = columns; + this.primaryKeyLivenessInfo = primaryKeyLivenessInfo; + this.deletion = deletion; + this.size = size; + this.data = data; + this.minLocalDeletionTime = minLocalDeletionTime; + } + + // Note that it's often easier/safer to use the sortedBuilder/unsortedBuilder or one of the static creation method below. Only directly useful in a small amount of cases. + public static ArrayBackedRow create(Clustering clustering, Columns columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, ColumnData[] data) + { + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); + if (minDeletionTime != Integer.MIN_VALUE) + { + for (int i = 0; i < size; i++) + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(data[i])); + } + + return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime); + } + + public static ArrayBackedRow emptyRow(Clustering clustering) + { + return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE); + } + + public static ArrayBackedRow singleCellRow(Clustering clustering, Cell cell) + { + if (cell.column().isSimple()) + return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ cell }, minDeletionTime(cell)); + + ComplexColumnData complexData = new ComplexColumnData(cell.column(), new Cell[]{ cell }, DeletionTime.LIVE); + return new ArrayBackedRow(clustering, Columns.of(cell.column()), LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ complexData }, minDeletionTime(cell)); + } + + public static ArrayBackedRow emptyDeletedRow(Clustering clustering, DeletionTime deletion) + { + assert !deletion.isLive(); + return new ArrayBackedRow(clustering, Columns.NONE, LivenessInfo.EMPTY, deletion, 0, NO_DATA, Integer.MIN_VALUE); + } + + public static ArrayBackedRow noCellLiveRow(Clustering clustering, LivenessInfo primaryKeyLivenessInfo) + { + assert !primaryKeyLivenessInfo.isEmpty(); + return new ArrayBackedRow(clustering, Columns.NONE, primaryKeyLivenessInfo, DeletionTime.LIVE, 0, NO_DATA, minDeletionTime(primaryKeyLivenessInfo)); + } + + private static int minDeletionTime(Cell cell) + { + return cell.isTombstone() ? Integer.MIN_VALUE : cell.localDeletionTime(); + } + + private static int minDeletionTime(LivenessInfo info) + { + return info.isExpiring() ? info.localExpirationTime() : Integer.MAX_VALUE; + } + + private static int minDeletionTime(DeletionTime dt) + { + return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE; + } + + private static int minDeletionTime(ComplexColumnData cd) + { + int min = minDeletionTime(cd.complexDeletion()); + for (Cell cell : cd) + min = Math.min(min, minDeletionTime(cell)); + return min; + } + + private static int minDeletionTime(ColumnData cd) + { + return cd.column().isSimple() ? minDeletionTime((Cell)cd) : minDeletionTime((ComplexColumnData)cd); + } + + public Clustering clustering() + { + return clustering; + } + + public Columns columns() + { + return columns; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return primaryKeyLivenessInfo; + } + + public DeletionTime deletion() + { + return deletion; + } + + public Cell getCell(ColumnDefinition c) + { + assert !c.isComplex(); + int idx = binarySearch(c); + return idx < 0 ? null : (Cell)data[idx]; + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + assert c.isComplex(); + int idx = binarySearch(c); + if (idx < 0) + return null; + + return ((ComplexColumnData)data[idx]).getCell(path); + } + + public ComplexColumnData getComplexColumnData(ColumnDefinition c) + { + assert c.isComplex(); + int idx = binarySearch(c); + return idx < 0 ? null : (ComplexColumnData)data[idx]; + } + + public Iterator<ColumnData> iterator() + { + return new ColumnDataIterator(); + } + + public Iterable<Cell> cells() + { + return CellIterator::new; + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return new ColumnSearchIterator(); + } + + public Row filter(ColumnFilter filter, CFMetaData metadata) + { + return filter(filter, DeletionTime.LIVE, false, metadata); + } + + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata) + { + Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); + + if (filter.includesAllColumns() && (activeDeletion.isLive() || deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty()) + return this; + + boolean mayHaveShadowed = activeDeletion.supersedes(deletion); + + LivenessInfo newInfo = primaryKeyLivenessInfo; + DeletionTime newDeletion = deletion; + if (mayHaveShadowed) + { + if (activeDeletion.deletes(newInfo.timestamp())) + newInfo = LivenessInfo.EMPTY; + // note that mayHaveShadowed means the activeDeletion shadows the row deletion. So if don't have setActiveDeletionToRow, + // the row deletion is shadowed and we shouldn't return it. + newDeletion = setActiveDeletionToRow ? activeDeletion : DeletionTime.LIVE; + } + + ColumnData[] newData = new ColumnData[size]; + int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion)); + Columns columns = filter.fetchedColumns().columns(isStatic()); + Predicate<ColumnDefinition> inclusionTester = columns.inOrderInclusionTester(); + int newSize = 0; + for (int i = 0; i < size; i++) + { + ColumnData cd = data[i]; + ColumnDefinition column = cd.column(); + if (!inclusionTester.test(column)) + continue; + + CFMetaData.DroppedColumn dropped = droppedColumns.get(column.name.bytes); + if (column.isSimple()) + { + Cell cell = (Cell)cd; + if ((dropped == null || cell.timestamp() > dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell))) + { + newData[newSize++] = cell; + newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(cell)); + } + } + else + { + ColumnData newCd = ((ComplexColumnData)cd).filter(filter, mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped); + if (newCd != null) + { + newData[newSize++] = newCd; + newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(newCd)); + } + } + } + + if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive()) + return null; + + return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime); + } + + public boolean hasComplexDeletion() + { + // We start by the end cause we know complex columns sort before simple ones + for (int i = size - 1; i >= 0; i--) + { + ColumnData cd = data[i]; + if (cd.column().isSimple()) + return false; + + if (!((ComplexColumnData)cd).complexDeletion().isLive()) + return true; + } + return false; + } + + public Row markCounterLocalToBeCleared() + { + ColumnData[] newData = null; + for (int i = 0; i < size; i++) + { + ColumnData cd = data[i]; + ColumnData newCd = cd.column().cellValueType().isCounter() + ? cd.markCounterLocalToBeCleared() + : cd; + if (newCd != cd) + { + if (newData == null) + newData = Arrays.copyOf(data, size); + newData[i] = newCd; + } + } + + return newData == null + ? this + : new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, newData, minLocalDeletionTime); + } + + public boolean hasDeletion(int nowInSec) + { + return nowInSec >= minLocalDeletionTime; + } + + /** + * Returns a copy of the row where all timestamps for live data have replaced by {@code newTimestamp} and + * all deletion timestamp by {@code newTimestamp - 1}. + * + * This exists for the Paxos path, see {@link PartitionUpdate#updateAllTimestamp} for additional details. + */ + public Row updateAllTimestamp(long newTimestamp) + { + LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? primaryKeyLivenessInfo : primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp); + DeletionTime newDeletion = deletion.isLive() ? deletion : new DeletionTime(newTimestamp - 1, deletion.localDeletionTime()); + + ColumnData[] newData = new ColumnData[size]; + for (int i = 0; i < size; i++) + newData[i] = data[i].updateAllTimestamp(newTimestamp); + + return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, size, newData, minLocalDeletionTime); + } + + public Row purge(DeletionPurger purger, int nowInSec) + { + if (!hasDeletion(nowInSec)) + return this; + + LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo; + DeletionTime newDeletion = purger.shouldPurge(deletion) ? DeletionTime.LIVE : deletion; + + int newMinDeletionTime = Math.min(minDeletionTime(newInfo), minDeletionTime(newDeletion)); + ColumnData[] newData = new ColumnData[size]; + int newSize = 0; + for (int i = 0; i < size; i++) + { + ColumnData purged = data[i].purge(purger, nowInSec); + if (purged != null) + { + newData[newSize++] = purged; + newMinDeletionTime = Math.min(newMinDeletionTime, minDeletionTime(purged)); + } + } + + if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive()) + return null; + + return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, newSize, newData, newMinDeletionTime); + } + + public int dataSize() + { + int dataSize = clustering.dataSize() + + primaryKeyLivenessInfo.dataSize() + + deletion.dataSize(); + + for (int i = 0; i < size; i++) + dataSize += data[i].dataSize(); + return dataSize; + } + + public long unsharedHeapSizeExcludingData() + { + long heapSize = EMPTY_SIZE + + clustering.unsharedHeapSizeExcludingData() + + ObjectSizes.sizeOfArray(data); + + for (int i = 0; i < size; i++) + heapSize += data[i].unsharedHeapSizeExcludingData(); + return heapSize; + } + + public static Row.Builder sortedBuilder(Columns columns) + { + return new SortedBuilder(columns); + } + + public static Row.Builder unsortedBuilder(Columns columns, int nowInSec) + { + return new UnsortedBuilder(columns, nowInSec); + } + + // This is only used by PartitionUpdate.CounterMark but other uses should be avoided as much as possible as it breaks our general + // assumption that Row objects are immutable. This method should go away post-#6506 in particular. + // This method is in particular not exposed by the Row API on purpose. + // This method also *assumes* that the cell we're setting already exists. + public void setValue(ColumnDefinition column, CellPath path, ByteBuffer value) + { + int idx = binarySearch(column); + assert idx >= 0; + if (column.isSimple()) + data[idx] = ((Cell)data[idx]).withUpdatedValue(value); + else + ((ComplexColumnData)data[idx]).setValue(path, value); + } + + private int binarySearch(ColumnDefinition column) + { + return binarySearch(column, 0, size); + } + + /** + * Simple binary search for a given column (in the data list). + * + * The return value has the exact same meaning that the one of Collections.binarySearch() but + * we don't use the later because we're searching for a 'ColumnDefinition' in an array of 'ColumnData'. + */ + private int binarySearch(ColumnDefinition column, int fromIndex, int toIndex) + { + int low = fromIndex; + int mid = toIndex; + int high = mid - 1; + int result = -1; + while (low <= high) + { + mid = (low + high) >> 1; + if ((result = column.compareTo(data[mid].column())) > 0) + low = mid + 1; + else if (result == 0) + return mid; + else + high = mid - 1; + } + return -mid - (result < 0 ? 1 : 2); + } + + private class ColumnDataIterator extends AbstractIterator<ColumnData> + { + private int i; + + protected ColumnData computeNext() + { + return i < size ? data[i++] : endOfData(); + } + } + + private class CellIterator extends AbstractIterator<Cell> + { + private int i; + private Iterator<Cell> complexCells; + + protected Cell computeNext() + { + while (true) + { + if (complexCells != null) + { + if (complexCells.hasNext()) + return complexCells.next(); + + complexCells = null; + } + + if (i >= size) + return endOfData(); + + ColumnData cd = data[i++]; + if (cd.column().isComplex()) + complexCells = ((ComplexColumnData)cd).iterator(); + else + return (Cell)cd; + } + } + } + + private class ColumnSearchIterator implements SearchIterator<ColumnDefinition, ColumnData> + { + // The index at which the next call to "next" should start looking from + private int searchFrom = 0; + + public boolean hasNext() + { + return searchFrom < size; + } + + public ColumnData next(ColumnDefinition column) + { + int idx = binarySearch(column, searchFrom, size); + if (idx < 0) + { + searchFrom = -idx - 1; + return null; + } + else + { + // We've found it. We'll start after it next time. + searchFrom = idx + 1; + return data[idx]; + } + } + } + + private static abstract class AbstractBuilder implements Row.Builder + { + protected final Columns columns; + + protected Clustering clustering; + protected LivenessInfo primaryKeyLivenessInfo; + protected DeletionTime deletion; + + protected List<Cell> cells = new ArrayList<>(); + + // For complex column at index i of 'columns', we store at complexDeletions[i] its complex deletion. + protected DeletionTime[] complexDeletions; + protected int columnsWithComplexDeletion; + + protected AbstractBuilder(Columns columns) + { + this.columns = columns; + this.complexDeletions = new DeletionTime[columns.complexColumnCount()]; + } + + public void newRow(Clustering clustering) + { + assert cells.isEmpty(); // Ensures we've properly called build() if we've use this builder before + this.clustering = clustering; + } + + public Clustering clustering() + { + return clustering; + } + + protected void reset() + { + this.clustering = null; + this.primaryKeyLivenessInfo = LivenessInfo.EMPTY; + this.deletion = DeletionTime.LIVE; + this.cells.clear(); + Arrays.fill(this.complexDeletions, null); + this.columnsWithComplexDeletion = 0; + } + + public void addPrimaryKeyLivenessInfo(LivenessInfo info) + { + this.primaryKeyLivenessInfo = info; + } + + public void addRowDeletion(DeletionTime deletion) + { + this.deletion = deletion; + } + + public void addCell(Cell cell) + { + assert cell.column().isStatic() == (clustering == Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = " + clustering; + cells.add(cell); + } + + public Row build() + { + Row row = buildInternal(); + reset(); + return row; + } + + protected abstract Row buildInternal(); + + protected Row buildNoCells() + { + assert cells.isEmpty(); + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); + if (columnsWithComplexDeletion == 0) + return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, 0, NO_DATA, minDeletionTime); + + ColumnData[] data = new ColumnData[columnsWithComplexDeletion]; + int size = 0; + for (int i = 0; i < complexDeletions.length; i++) + { + DeletionTime complexDeletion = complexDeletions[i]; + if (complexDeletion != null) + { + assert !complexDeletion.isLive(); + data[size++] = new ComplexColumnData(columns.getComplex(i), ComplexColumnData.NO_CELLS, complexDeletion); + minDeletionTime = Integer.MIN_VALUE; + } + } + return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime); + } + } + + public static class SortedBuilder extends AbstractBuilder + { + private int columnCount; + + private ColumnDefinition column; + + // The index of the last column for which we've called setColumn if complex. + private int complexColumnIndex; + + // For complex column at index i of 'columns', we store at complexColumnCellsCount[i] its number of added cells. + private final int[] complexColumnCellsCount; + + protected SortedBuilder(Columns columns) + { + super(columns); + this.complexColumnCellsCount = new int[columns.complexColumnCount()]; + reset(); + } + + @Override + protected void reset() + { + super.reset(); + this.column = null; + this.columnCount = 0; + this.complexColumnIndex = -1; + Arrays.fill(this.complexColumnCellsCount, 0); + } + + public boolean isSorted() + { + return true; + } + + private void setColumn(ColumnDefinition column) + { + int cmp = this.column == null ? -1 : this.column.compareTo(column); + assert cmp <= 0 : "current = " + this.column + ", new = " + column; + if (cmp != 0) + { + this.column = column; + ++columnCount; + if (column.isComplex()) + complexColumnIndex = columns.complexIdx(column, complexColumnIndex + 1); + } + } + + @Override + public void addCell(Cell cell) + { + setColumn(cell.column()); + super.addCell(cell); + if (column.isComplex()) + complexColumnCellsCount[complexColumnIndex] += 1; + } + + @Override + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + if (complexDeletion.isLive()) + return; + + setColumn(column); + assert complexDeletions[complexColumnIndex] == null; + complexDeletions[complexColumnIndex] = complexDeletion; + ++columnsWithComplexDeletion; + } + + protected Row buildInternal() + { + if (cells.isEmpty()) + return buildNoCells(); + + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); + + ColumnData[] data = new ColumnData[columnCount]; + int complexIdx = 0; + int i = 0; + int size = 0; + while (i < cells.size()) + { + Cell cell = cells.get(i); + ColumnDefinition column = cell.column(); + if (column.isSimple()) + { + data[size++] = cell; + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cell)); + ++i; + } + else + { + while (columns.getComplex(complexIdx).compareTo(column) < 0) + { + if (complexDeletions[complexIdx] != null) + { + data[size++] = new ComplexColumnData(columns.getComplex(complexIdx), ComplexColumnData.NO_CELLS, complexDeletions[complexIdx]); + minDeletionTime = Integer.MIN_VALUE; + } + ++complexIdx; + } + + DeletionTime complexDeletion = complexDeletions[complexIdx]; + if (complexDeletion != null) + minDeletionTime = Integer.MIN_VALUE; + int cellCount = complexColumnCellsCount[complexIdx]; + Cell[] complexCells = new Cell[cellCount]; + for (int j = 0; j < cellCount; j++) + { + Cell complexCell = cells.get(i + j); + complexCells[j] = complexCell; + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(complexCell)); + } + i += cellCount; + + data[size++] = new ComplexColumnData(column, complexCells, complexDeletion == null ? DeletionTime.LIVE : complexDeletion); + ++complexIdx; + } + } + for (int j = complexIdx; j < complexDeletions.length; j++) + { + if (complexDeletions[j] != null) + { + data[size++] = new ComplexColumnData(columns.getComplex(j), ComplexColumnData.NO_CELLS, complexDeletions[j]); + minDeletionTime = Integer.MIN_VALUE; + } + } + assert size == data.length; + return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime); + } + } + + private static class UnsortedBuilder extends AbstractBuilder + { + private final int nowInSec; + + private UnsortedBuilder(Columns columns, int nowInSec) + { + super(columns); + this.nowInSec = nowInSec; + reset(); + } + + public boolean isSorted() + { + return false; + } + + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion) + { + assert column.isComplex(); + assert column.isStatic() == (clustering == Clustering.STATIC_CLUSTERING); + + if (complexDeletion.isLive()) + return; + + int complexColumnIndex = columns.complexIdx(column, 0); + + DeletionTime previous = complexDeletions[complexColumnIndex]; + if (previous == null || complexDeletion.supersedes(previous)) + { + complexDeletions[complexColumnIndex] = complexDeletion; + if (previous == null) + ++columnsWithComplexDeletion; + } + } + + protected Row buildInternal() + { + // First, the easy cases + if (cells.isEmpty()) + return buildNoCells(); + + // Cells have been added in an unsorted way, so sort them first + Collections.sort(cells, Cell.comparator); + + // We now need to + // 1) merge equal cells together + // 2) group the cells for a given complex column together, and include their potential complex deletion time. + // And this without forgetting that some complex columns may have a complex deletion but not cells. + + int addedColumns = countAddedColumns(); + ColumnData[] data = new ColumnData[addedColumns]; + + int nextComplexWithDeletion = findNextComplexWithDeletion(0); + ColumnDefinition previousColumn = null; + + int minDeletionTime = Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion)); + + int i = 0; + int size = 0; + while (i < cells.size()) + { + Cell cell = cells.get(i++); + ColumnDefinition column = cell.column(); + if (column.isSimple()) + { + // Either it's a cell for the same column than our previous cell and we merge them together, or it's a new column + if (previousColumn != null && previousColumn.compareTo(column) == 0) + data[size - 1] = Cells.reconcile((Cell)data[size - 1], cell, nowInSec); + else + data[size++] = cell; + } + else + { + // First, collect the complex deletion time for the column we got the first complex column of. We'll + // also find if there is columns that sorts before but had only a complex deletion and add them. + DeletionTime complexDeletion = DeletionTime.LIVE; + while (nextComplexWithDeletion >= 0) + { + int cmp = column.compareTo(columns.getComplex(nextComplexWithDeletion)); + if (cmp < 0) + { + // This is after the column we're gonna add cell for. We'll deal with it later + break; + } + else if (cmp > 0) + { + // We have a column that only has a complex deletion and no column. Add its data first + data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]); + minDeletionTime = Integer.MIN_VALUE; + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + } + else // cmp == 0 + { + // This is the column we'll about to add cell for. Record the deletion time and break to the cell addition + complexDeletion = complexDeletions[nextComplexWithDeletion]; + minDeletionTime = Integer.MIN_VALUE; + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + break; + } + } + + // Find how many cells the complex column has (cellCount) and the index of the next cell that doesn't belong to it (nextColumnIdx). + int nextColumnIdx = i; // i is on cell following the current one + int cellCount = 1; // We have at least the current cell + Cell previousCell = cell; + while (nextColumnIdx < cells.size()) + { + Cell newCell = cells.get(nextColumnIdx); + if (column.compareTo(newCell.column()) != 0) + break; + + ++nextColumnIdx; + if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) != 0) + ++cellCount; + previousCell = newCell; + } + Cell[] columnCells = new Cell[cellCount]; + int complexSize = 0; + columnCells[complexSize++] = cell; + previousCell = cell; + for (int j = i; j < nextColumnIdx; j++) + { + Cell newCell = cells.get(j); + // Either it's a cell for the same path than our previous cell and we merge them together, or it's a new path + if (column.cellPathComparator().compare(previousCell.path(), newCell.path()) == 0) + columnCells[complexSize - 1] = Cells.reconcile(previousCell, newCell, nowInSec); + else + columnCells[complexSize++] = newCell; + previousCell = newCell; + } + i = nextColumnIdx; + + data[size++] = new ComplexColumnData(column, columnCells, complexDeletion); + } + previousColumn = column; + } + // We may still have some complex columns with only a complex deletion + while (nextComplexWithDeletion >= 0) + { + data[size++] = new ComplexColumnData(columns.getComplex(nextComplexWithDeletion), ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]); + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + minDeletionTime = Integer.MIN_VALUE; + } + assert size == addedColumns; + + // Reconciliation made it harder to compute minDeletionTime for cells in the loop above, so just do it now if we need to. + if (minDeletionTime != Integer.MIN_VALUE) + { + for (ColumnData cd : data) + minDeletionTime = Math.min(minDeletionTime, minDeletionTime(cd)); + } + + return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, deletion, size, data, minDeletionTime); + } + + private int findNextComplexWithDeletion(int from) + { + for (int i = from; i < complexDeletions.length; i++) + { + if (complexDeletions[i] != null) + return i; + } + return -1; + } + + // Should only be called once the cells have been sorted + private int countAddedColumns() + { + int columnCount = 0; + int nextComplexWithDeletion = findNextComplexWithDeletion(0); + ColumnDefinition previousColumn = null; + for (Cell cell : cells) + { + if (previousColumn != null && previousColumn.compareTo(cell.column()) == 0) + continue; + + ++columnCount; + previousColumn = cell.column(); + + // We know that simple columns sort before the complex ones, so don't bother with the column having complex deletion + // until we've reached the cells of complex columns. + if (!previousColumn.isComplex()) + continue; + + while (nextComplexWithDeletion >= 0) + { + // Check how the column we just counted compared to the next with complex deletion + int cmp = previousColumn.compareTo(columns.getComplex(nextComplexWithDeletion)); + if (cmp < 0) + { + // it's before, we'll handle nextColumnWithComplexDeletion later + break; + } + else if (cmp > 0) + { + // it's after. nextColumnWithComplexDeletion has no cell but we should count it + ++columnCount; + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + } + else // cmp == 0 + { + // it's the column we just counted. Ignore it and we know we're good with nextComplexWithDeletion for this loop + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + break; + } + } + } + // Anything remaining in complexDeletionColumns are complex columns with no cells but some complex deletion + while (nextComplexWithDeletion >= 0) + { + ++columnCount; + nextComplexWithDeletion = findNextComplexWithDeletion(nextComplexWithDeletion + 1); + } + return columnCount; + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java new file mode 100644 index 0000000..c339092 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +public class BufferCell extends AbstractCell +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null)); + + private final ColumnDefinition column; + + private final long timestamp; + private final int ttl; + private final int localDeletionTime; + + private final ByteBuffer value; + private final CellPath path; + + public BufferCell(ColumnDefinition column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path) + { + assert column.isComplex() == (path != null); + this.column = column; + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + this.value = value; + this.path = path; + } + + public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value) + { + return live(metadata, column, timestamp, value, null); + } + + public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value, CellPath path) + { + if (metadata.getDefaultTimeToLive() != NO_TTL) + return expiring(column, timestamp, metadata.getDefaultTimeToLive(), FBUtilities.nowInSeconds(), value, path); + + return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, value, path); + } + + public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value) + { + return expiring(column, timestamp, ttl, nowInSec, value, null); + } + + public static BufferCell expiring(ColumnDefinition column, long timestamp, int ttl, int nowInSec, ByteBuffer value, CellPath path) + { + assert ttl != NO_TTL; + return new BufferCell(column, timestamp, ttl, nowInSec + ttl, value, path); + } + + public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec) + { + return tombstone(column, timestamp, nowInSec, null); + } + + public static BufferCell tombstone(ColumnDefinition column, long timestamp, int nowInSec, CellPath path) + { + return new BufferCell(column, timestamp, NO_TTL, nowInSec, ByteBufferUtil.EMPTY_BYTE_BUFFER, path); + } + + public ColumnDefinition column() + { + return column; + } + + public boolean isCounterCell() + { + return !isTombstone() && column.cellValueType().isCounter(); + } + + public boolean isLive(int nowInSec) + { + return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && nowInSec < localDeletionTime); + } + + public boolean isTombstone() + { + return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL; + } + + public boolean isExpiring() + { + return ttl != NO_TTL; + } + + public long timestamp() + { + return timestamp; + } + + public int ttl() + { + return ttl; + } + + public int localDeletionTime() + { + return localDeletionTime; + } + + public ByteBuffer value() + { + return value; + } + + public CellPath path() + { + return path; + } + + public Cell withUpdatedValue(ByteBuffer newValue) + { + return new BufferCell(column, timestamp, ttl, localDeletionTime, newValue, path); + } + + public Cell copy(AbstractAllocator allocator) + { + if (!value.hasRemaining()) + return this; + + return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator)); + } + + public Cell markCounterLocalToBeCleared() + { + if (!isCounterCell()) + return this; + + ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(value()); + return marked == value() ? this : new BufferCell(column, timestamp, ttl, localDeletionTime, marked, path); + } + + public Cell purge(DeletionPurger purger, int nowInSec) + { + if (!isLive(nowInSec)) + { + if (purger.shouldPurge(timestamp, localDeletionTime)) + return null; + + // We slightly hijack purging to convert expired but not purgeable columns to tombstones. The reason we do that is + // that once a column has expired it is equivalent to a tombstone but actually using a tombstone is more compact since + // we don't keep the column value. The reason we do it here is that 1) it's somewhat related to dealing with tombstones + // so hopefully not too surprising and 2) we want to this and purging at the same places, so it's simpler/more efficient + // to do both here. + if (isExpiring()) + { + // Note that as long as the expiring column and the tombstone put together live longer than GC grace seconds, + // we'll fulfil our responsibility to repair. See discussion at + // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html + return BufferCell.tombstone(column, timestamp, localDeletionTime - ttl); + } + } + return this; + } + + public Cell updateAllTimestamp(long newTimestamp) + { + return new BufferCell(column, isTombstone() ? newTimestamp - 1 : newTimestamp, ttl, localDeletionTime, value, path); + } + + public int dataSize() + { + return TypeSizes.sizeof(timestamp) + + TypeSizes.sizeof(ttl) + + TypeSizes.sizeof(localDeletionTime) + + value.remaining() + + (path == null ? 0 : path.dataSize()); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path == null ? 0 : path.unsharedHeapSizeExcludingData()); + } + + /** + * The serialization format for cell is: + * [ flags ][ timestamp ][ deletion time ][ ttl ][ path size ][ path ][ value size ][ value ] + * [ 1b ][ 8b (vint) ][ 4b (vint) ][ 4b (vint) ][ 4b (vint) ][ arb ][ 4b (vint) ][ arb ] + * + * where not all field are always present (in fact, only the [ flags ] are guaranteed to be present). The fields have the following + * meaning: + * - [ flags ] is the cell flags. It is a byte for which each bit represents a flag whose meaning is explained below (*_MASK constants) + * - [ timestamp ] is the cell timestamp. Present unless the cell has the USE_TIMESTAMP_MASK. + * - [ deletion time]: the local deletion time for the cell. Present if either the cell is deleted (IS_DELETED_MASK) + * or it is expiring (IS_EXPIRING_MASK) but doesn't have the USE_ROW_TTL_MASK. + * - [ ttl ]: the ttl for the cell. Present if the row is expiring (IS_EXPIRING_MASK) but doesn't have the + * USE_ROW_TTL_MASK. + * - [ value size ] is the size of the [ value ] field. It's present unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value + * for columns of this type have a fixed length. + * - [ path size ] is the size of the [ path ] field. Present iff this is the cell of a complex column. + * - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK. + * - [ path ]: the cell path if the column this is a cell of is complex. + */ + static class Serializer implements Cell.Serializer + { + private final static int PRESENCE_MASK = 0x01; // Marks the actual presence of a cell. This is used only when serialized on-disk and + // on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag set). + private final static int IS_DELETED_MASK = 0x02; // Whether the cell is a tombstone or not. + private final static int IS_EXPIRING_MASK = 0x04; // Whether the cell is expiring. + private final static int HAS_EMPTY_VALUE_MASK = 0x08; // Wether the cell has an empty value. This will be the case for tombstone in particular. + private final static int USE_ROW_TIMESTAMP_MASK = 0x10; // Wether the cell has the same timestamp than the row this is a cell of. + private final static int USE_ROW_TTL_MASK = 0x20; // Wether the cell has the same ttl than the row this is a cell of. + + public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException + { + if (cell == null) + { + out.writeByte((byte)0); + return; + } + + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); + int flags = PRESENCE_MASK; + if (!hasValue) + flags |= HAS_EMPTY_VALUE_MASK; + + if (isDeleted) + flags |= IS_DELETED_MASK; + else if (isExpiring) + flags |= IS_EXPIRING_MASK; + + if (useRowTimestamp) + flags |= USE_ROW_TIMESTAMP_MASK; + if (useRowTTL) + flags |= USE_ROW_TTL_MASK; + + out.writeByte((byte)flags); + + if (!useRowTimestamp) + out.writeVInt(header.encodeTimestamp(cell.timestamp())); + + if ((isDeleted || isExpiring) && !useRowTTL) + out.writeVInt(header.encodeDeletionTime(cell.localDeletionTime())); + if (isExpiring && !useRowTTL) + out.writeVInt(header.encodeTTL(cell.ttl())); + + if (cell.column().isComplex()) + cell.column().cellPathSerializer().serialize(cell.path(), out); + + if (hasValue) + header.getType(cell.column()).writeValue(cell.value(), out); + } + + public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException + { + int flags = in.readUnsignedByte(); + if ((flags & PRESENCE_MASK) == 0) + return null; + + boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & IS_DELETED_MASK) != 0; + boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; + + long timestamp = useRowTimestamp ? rowLiveness.timestamp() : header.decodeTimestamp(in.readVInt()); + + int localDeletionTime = useRowTTL + ? rowLiveness.localExpirationTime() + : (isDeleted || isExpiring ? header.decodeDeletionTime((int)in.readVInt()) : NO_DELETION_TIME); + + int ttl = useRowTTL + ? rowLiveness.ttl() + : (isExpiring ? header.decodeTTL((int)in.readVInt()) : NO_TTL); + + CellPath path = column.isComplex() + ? column.cellPathSerializer().deserialize(in) + : null; + + boolean isCounter = localDeletionTime == NO_DELETION_TIME && column.type.isCounter(); + + ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER; + if (hasValue) + { + if (helper.canSkipValue(column) || (path != null && helper.canSkipValue(path))) + { + header.getType(column).skipValue(in); + } + else + { + value = header.getType(column).readValue(in); + if (isCounter) + value = helper.maybeClearCounterValue(value); + } + } + + return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); + } + + public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header) + { + long size = 1; // flags + + if (cell == null) + return size; + + boolean hasValue = cell.value().hasRemaining(); + boolean isDeleted = cell.isTombstone(); + boolean isExpiring = cell.isExpiring(); + boolean useRowTimestamp = !rowLiveness.isEmpty() && cell.timestamp() == rowLiveness.timestamp(); + boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == rowLiveness.localExpirationTime(); + + if (!useRowTimestamp) + size += TypeSizes.sizeofVInt(header.encodeTimestamp(cell.timestamp())); + + if ((isDeleted || isExpiring) && !useRowTTL) + size += TypeSizes.sizeofVInt(header.encodeDeletionTime(cell.localDeletionTime())); + if (isExpiring && !useRowTTL) + size += TypeSizes.sizeofVInt(header.encodeTTL(cell.ttl())); + + if (cell.column().isComplex()) + size += cell.column().cellPathSerializer().serializedSize(cell.path()); + + if (hasValue) + size += header.getType(cell.column()).writtenLength(cell.value()); + + return size; + } + + // Returns if the skipped cell was an actual cell (i.e. it had its presence flag). + public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException + { + int flags = in.readUnsignedByte(); + if ((flags & PRESENCE_MASK) == 0) + return false; + + boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0; + boolean isDeleted = (flags & IS_DELETED_MASK) != 0; + boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0; + boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0; + boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0; + + if (!useRowTimestamp) + in.readVInt(); + + if (!useRowTTL && (isDeleted || isExpiring)) + in.readVInt(); + + if (!useRowTTL && isExpiring) + in.readVInt(); + + if (column.isComplex()) + column.cellPathSerializer().skip(in); + + if (hasValue) + header.getType(column).skipValue(in); + + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index 80bf901..ccb9708 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -17,40 +17,41 @@ */ package org.apache.cassandra.db.rows; +import java.io.IOException; import java.nio.ByteBuffer; -import java.security.MessageDigest; +import java.util.Comparator; import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.Aliasable; -import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** - * A cell holds a single "simple" value for a given column, as well as "liveness" - * informations regarding that value. + * A cell is our atomic unit for a single value of a single column. * <p> - * The is 2 kind of columns: simple ones and complex ones. - * Simple columns have only a single associated cell, while complex ones, - * the one corresponding to non-frozen collections and UDTs, are comprised - * of multiple cells. For complex columns, the different cells are distinguished - * by their cell path. - * <p> - * We can also distinguish different kind of cells based on the property of their - * {@link #livenessInfo}: - * 1) "Normal" cells: their liveness info has no ttl and no deletion time. - * 2) Expiring cells: their liveness info has both a ttl and a deletion time (the latter - * deciding when the cell is actually expired). - * 3) Tombstones/deleted cells: their liveness info has a deletion time but no ttl. Those - * cells don't really have a value but their {@link #value} method return an empty - * buffer by convention. + * A cell always holds at least a timestamp that gives us how the cell reconcile. We then + * have 3 main types of cells: + * 1) live regular cells: those will also have a value and, if for a complex column, a path. + * 2) expiring cells: on top of regular cells, those have a ttl and a local deletion time (when they are expired). + * 3) tombstone cells: those won't have value, but they have a local deletion time (when the tombstone was created). */ -public interface Cell extends Aliasable<Cell> +public interface Cell extends ColumnData { - /** - * The column this cell belongs to. - * - * @return the column this cell belongs to. - */ - public ColumnDefinition column(); + public static final int NO_TTL = 0; + public static final int NO_DELETION_TIME = Integer.MAX_VALUE; + + public final Comparator<Cell> comparator = (c1, c2) -> + { + int cmp = c1.column().compareTo(c2.column()); + if (cmp != 0) + return cmp; + + Comparator<CellPath> pathComparator = c1.column().cellPathComparator(); + return pathComparator == null ? 0 : pathComparator.compare(c1.path(), c2.path()); + }; + + public final Serializer serializer = new BufferCell.Serializer(); /** * Whether the cell is a counter cell or not. @@ -67,12 +68,26 @@ public interface Cell extends Aliasable<Cell> public ByteBuffer value(); /** - * The liveness info of the cell, that is its timestamp and whether it is - * expiring, deleted or none of the above. + * The cell timestamp. + * <p> + * @return the cell timestamp. + */ + public long timestamp(); + + /** + * The cell ttl. + * + * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring one. + */ + public int ttl(); + + /** + * The cell local deletion time. * - * @return the cell {@link LivenessInfo}. + * @return the cell local deletion time, or {@code NO_DELETION_TIME} if the cell is neither + * a tombstone nor an expiring one. */ - public LivenessInfo livenessInfo(); + public int localDeletionTime(); /** * Whether the cell is a tombstone or not. @@ -109,34 +124,27 @@ public interface Cell extends Aliasable<Cell> */ public CellPath path(); - /** - * Write the cell to the provided writer. - * - * @param writer the row writer to write the cell to. - */ - public void writeTo(Row.Writer writer); + public Cell withUpdatedValue(ByteBuffer newValue); - /** - * Adds the cell to the provided digest. - * - * @param digest the {@code MessageDigest} to add the cell to. - */ - public void digest(MessageDigest digest); + public Cell copy(AbstractAllocator allocator); - /** - * Validate the cell value. - * - * @throws MarshalException if the cell value is not a valid value for - * the column type this is a cell of. - */ - public void validate(); + @Override + // Overrides super type to provide a more precise return type. + public Cell markCounterLocalToBeCleared(); - /** - * The size of the data hold by this cell. - * - * This is mainly used to verify if batches goes over a given size. - * - * @return the size used by the data of this cell. - */ - public int dataSize(); + @Override + // Overrides super type to provide a more precise return type. + public Cell purge(DeletionPurger purger, int nowInSec); + + public interface Serializer + { + public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException; + + public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException; + + public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header); + + // Returns if the skipped cell was an actual cell (i.e. it had its presence flag). + public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/CellData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/CellData.java b/src/java/org/apache/cassandra/db/rows/CellData.java deleted file mode 100644 index 29eac01..0000000 --- a/src/java/org/apache/cassandra/db/rows/CellData.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.nio.ByteBuffer; -import java.util.Arrays; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.ObjectSizes; - -/** - * Contains (non-counter) cell data for one or more rows. - */ -class CellData -{ - private boolean isCounter; - - private ByteBuffer[] values; - private final LivenessInfoArray livenessInfos; - - CellData(int initialCellCapacity, boolean isCounter) - { - this.isCounter = isCounter; - this.values = new ByteBuffer[initialCellCapacity]; - this.livenessInfos = new LivenessInfoArray(initialCellCapacity); - } - - public void setCell(int idx, ByteBuffer value, LivenessInfo info) - { - ensureCapacity(idx); - values[idx] = value; - livenessInfos.set(idx, info); - } - - public boolean hasCell(int idx) - { - return idx < values.length && values[idx] != null; - } - - public ByteBuffer value(int idx) - { - return values[idx]; - } - - public void setValue(int idx, ByteBuffer value) - { - values[idx] = value; - } - - private void ensureCapacity(int idxToSet) - { - int originalCapacity = values.length; - if (idxToSet < originalCapacity) - return; - - int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, idxToSet); - - values = Arrays.copyOf(values, newCapacity); - livenessInfos.resize(newCapacity); - } - - // Swap cell i and j - public void swapCell(int i, int j) - { - ensureCapacity(Math.max(i, j)); - - ByteBuffer value = values[j]; - values[j] = values[i]; - values[i] = value; - - livenessInfos.swap(i, j); - } - - // Merge cell i into j - public void mergeCell(int i, int j, int nowInSec) - { - if (isCounter) - mergeCounterCell(this, i, this, j, this, j, nowInSec); - else - mergeRegularCell(this, i, this, j, this, j, nowInSec); - } - - private static boolean handleNoCellCase(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged) - { - if (!d1.hasCell(i1)) - { - if (d2.hasCell(i2)) - d2.moveCell(i2, merged, iMerged); - return true; - } - if (!d2.hasCell(i2)) - { - d1.moveCell(i1, merged, iMerged); - return true; - } - return false; - } - - public static void mergeRegularCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec) - { - if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged)) - return; - - Conflicts.Resolution res = Conflicts.resolveRegular(d1.livenessInfos.timestamp(i1), - d1.livenessInfos.isLive(i1, nowInSec), - d1.livenessInfos.localDeletionTime(i1), - d1.values[i1], - d2.livenessInfos.timestamp(i2), - d2.livenessInfos.isLive(i2, nowInSec), - d2.livenessInfos.localDeletionTime(i2), - d2.values[i2]); - - assert res != Conflicts.Resolution.MERGE; - if (res == Conflicts.Resolution.LEFT_WINS) - d1.moveCell(i1, merged, iMerged); - else - d2.moveCell(i2, merged, iMerged); - } - - public static void mergeCounterCell(CellData d1, int i1, CellData d2, int i2, CellData merged, int iMerged, int nowInSec) - { - if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged)) - return; - - Conflicts.Resolution res = Conflicts.resolveCounter(d1.livenessInfos.timestamp(i1), - d1.livenessInfos.isLive(i1, nowInSec), - d1.values[i1], - d2.livenessInfos.timestamp(i2), - d2.livenessInfos.isLive(i2, nowInSec), - d2.values[i2]); - - switch (res) - { - case LEFT_WINS: - d1.moveCell(i1, merged, iMerged); - break; - case RIGHT_WINS: - d2.moveCell(i2, merged, iMerged); - break; - default: - merged.values[iMerged] = Conflicts.mergeCounterValues(d1.values[i1], d2.values[i2]); - if (d1.livenessInfos.timestamp(i1) > d2.livenessInfos.timestamp(i2)) - merged.livenessInfos.set(iMerged, d1.livenessInfos.timestamp(i1), d1.livenessInfos.ttl(i1), d1.livenessInfos.localDeletionTime(i1)); - else - merged.livenessInfos.set(iMerged, d2.livenessInfos.timestamp(i2), d2.livenessInfos.ttl(i2), d2.livenessInfos.localDeletionTime(i2)); - break; - } - } - - // Move cell i into j - public void moveCell(int i, int j) - { - moveCell(i, this, j); - } - - public void moveCell(int i, CellData target, int j) - { - if (!hasCell(i) || (target == this && i == j)) - return; - - target.ensureCapacity(j); - - target.values[j] = values[i]; - target.livenessInfos.set(j, livenessInfos.timestamp(i), - livenessInfos.ttl(i), - livenessInfos.localDeletionTime(i)); - } - - public int dataSize() - { - int size = livenessInfos.dataSize(); - for (int i = 0; i < values.length; i++) - if (values[i] != null) - size += values[i].remaining(); - return size; - } - - public void clear() - { - Arrays.fill(values, null); - livenessInfos.clear(); - } - - public long unsharedHeapSizeExcludingData() - { - return ObjectSizes.sizeOnHeapExcludingData(values) - + livenessInfos.unsharedHeapSize(); - } - - @Override - public String toString() - { - StringBuilder sb = new StringBuilder(); - sb.append("CellData(size=").append(values.length); - if (isCounter) - sb.append(", counter"); - sb.append("){"); - LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor(); - for (int i = 0; i < values.length; i++) - { - if (values[i] == null) - { - sb.append("[null]"); - continue; - } - sb.append("[len(v)=").append(values[i].remaining()); - sb.append(", info=").append(cursor.setTo(livenessInfos, i)); - sb.append("]"); - } - return sb.append("}").toString(); - } - - static class ReusableCell extends AbstractCell - { - private final LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor(); - - private CellData data; - private ColumnDefinition column; - protected int idx; - - ReusableCell setTo(CellData data, ColumnDefinition column, int idx) - { - if (!data.hasCell(idx)) - return null; - - this.data = data; - this.column = column; - this.idx = idx; - - cursor.setTo(data.livenessInfos, idx); - return this; - } - - public ColumnDefinition column() - { - return column; - } - - public boolean isCounterCell() - { - return data.isCounter && !cursor.hasLocalDeletionTime(); - } - - public ByteBuffer value() - { - return data.value(idx); - } - - public LivenessInfo livenessInfo() - { - return cursor; - } - - public CellPath path() - { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/CellPath.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java b/src/java/org/apache/cassandra/db/rows/CellPath.java index 40d525c..68e3c2b 100644 --- a/src/java/org/apache/cassandra/db/rows/CellPath.java +++ b/src/java/org/apache/cassandra/db/rows/CellPath.java @@ -17,13 +17,16 @@ */ package org.apache.cassandra.db.rows; -import java.io.DataInput; import java.io.IOException; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.Objects; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** * A path for a cell belonging to a complex column type (non-frozen collection or UDT). @@ -40,7 +43,7 @@ public abstract class CellPath public static CellPath create(ByteBuffer value) { assert value != null; - return new SimpleCellPath(new ByteBuffer[]{ value }); + return new CollectionCellPath(value); } public int dataSize() @@ -57,6 +60,10 @@ public abstract class CellPath digest.update(get(i).duplicate()); } + public abstract CellPath copy(AbstractAllocator allocator); + + public abstract long unsharedHeapSizeExcludingData(); + @Override public final int hashCode() { @@ -86,28 +93,41 @@ public abstract class CellPath public interface Serializer { public void serialize(CellPath path, DataOutputPlus out) throws IOException; - public CellPath deserialize(DataInput in) throws IOException; + public CellPath deserialize(DataInputPlus in) throws IOException; public long serializedSize(CellPath path); - public void skip(DataInput in) throws IOException; + public void skip(DataInputPlus in) throws IOException; } - static class SimpleCellPath extends CellPath + private static class CollectionCellPath extends CellPath { - protected final ByteBuffer[] values; + private static final long EMPTY_SIZE = ObjectSizes.measure(new CollectionCellPath(ByteBufferUtil.EMPTY_BYTE_BUFFER)); - public SimpleCellPath(ByteBuffer[] values) + protected final ByteBuffer value; + + private CollectionCellPath(ByteBuffer value) { - this.values = values; + this.value = value; } public int size() { - return values.length; + return 1; } public ByteBuffer get(int i) { - return values[i]; + assert i == 0; + return value; + } + + public CellPath copy(AbstractAllocator allocator) + { + return new CollectionCellPath(allocator.clone(value)); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value); } } @@ -122,5 +142,15 @@ public abstract class CellPath { throw new UnsupportedOperationException(); } + + public CellPath copy(AbstractAllocator allocator) + { + return this; + } + + public long unsharedHeapSizeExcludingData() + { + return 0; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Cells.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java index 1e329e5..080d640 100644 --- a/src/java/org/apache/cassandra/db/rows/Cells.java +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -23,9 +23,8 @@ import java.util.Iterator; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.utils.ByteBufferUtil; /** * Static methods to work on cells. @@ -35,54 +34,17 @@ public abstract class Cells private Cells() {} /** - * Writes a tombstone cell to the provided writer. + * Collect statistics ont a given cell. * - * @param writer the {@code Row.Writer} to write the tombstone to. - * @param column the column for the tombstone. - * @param timestamp the timestamp for the tombstone. - * @param localDeletionTime the local deletion time (in seconds) for the tombstone. + * @param cell the cell for which to collect stats. + * @param collector the stats collector. */ - public static void writeTombstone(Row.Writer writer, ColumnDefinition column, long timestamp, int localDeletionTime) + public static void collectStats(Cell cell, PartitionStatisticsCollector collector) { - writer.writeCell(column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime), null); - } - - /** - * Computes the difference between a cell and the result of merging this - * cell to other cells. - * <p> - * This method is used when cells from multiple sources are merged and we want to - * find for a given source if it was up to date for that cell, and if not, what - * should be sent to the source to repair it. - * - * @param merged the cell that is the result of merging multiple source. - * @param cell the cell from one of the source that has been merged to yied - * {@code merged}. - * @return {@code null} if the source having {@code cell} is up-to-date for that - * cell, or a cell that applied to the source will "repair" said source otherwise. - */ - public static Cell diff(Cell merged, Cell cell) - { - // Note that it's enough to check if merged is a counterCell. If it isn't and - // cell is one, it means that merged is a tombstone with a greater timestamp - // than cell, because that's the only case where reconciling a counter with - // a tombstone don't yield a counter. If that's the case, the normal path will - // return what it should. - if (merged.isCounterCell()) - { - if (merged.livenessInfo().supersedes(cell.livenessInfo())) - return merged; + collector.update(cell); - // Reconciliation never returns something with a timestamp strictly lower than its operand. This - // means we're in the case where merged.timestamp() == cell.timestamp(). As 1) tombstones - // always win over counters (CASSANDRA-7346) and 2) merged is a counter, it follows that cell - // can't be a tombstone or merged would be one too. - assert !cell.isTombstone(); - - CounterContext.Relationship rel = CounterContext.instance().diff(merged.value(), cell.value()); - return (rel == CounterContext.Relationship.GREATER_THAN || rel == CounterContext.Relationship.DISJOINT) ? merged : null; - } - return merged.livenessInfo().supersedes(cell.livenessInfo()) ? merged : null; + if (cell.isCounterCell()) + collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell)); } /** @@ -106,7 +68,7 @@ public abstract class Cells * {@code writer}. * @param deletion the deletion time that applies to the cells being considered. * This deletion time may delete both {@code existing} or {@code update}. - * @param writer the row writer to which the result of the reconciliation is written. + * @param builder the row builder to which the result of the reconciliation is written. * @param nowInSec the current time in seconds (which plays a role during reconciliation * because deleted cells always have precedence on timestamp equality and deciding if a * cell is a live or not depends on the current time due to expiring cells). @@ -121,12 +83,12 @@ public abstract class Cells Cell existing, Cell update, DeletionTime deletion, - Row.Writer writer, + Row.Builder builder, int nowInSec, SecondaryIndexManager.Updater indexUpdater) { - existing = existing == null || deletion.deletes(existing.livenessInfo()) ? null : existing; - update = update == null || deletion.deletes(update.livenessInfo()) ? null : update; + existing = existing == null || deletion.deletes(existing) ? null : existing; + update = update == null || deletion.deletes(update) ? null : update; if (existing == null || update == null) { if (update != null) @@ -135,17 +97,17 @@ public abstract class Cells // we'll need to fix that damn 2ndary index API to avoid that. updatePKIndexes(clustering, update, nowInSec, indexUpdater); indexUpdater.insert(clustering, update); - update.writeTo(writer); + builder.addCell(update); } else if (existing != null) { - existing.writeTo(writer); + builder.addCell(existing); } return Long.MAX_VALUE; } Cell reconciled = reconcile(existing, update, nowInSec); - reconciled.writeTo(writer); + builder.addCell(reconciled); // Note that this test rely on reconcile returning either 'existing' or 'update'. That's not true for counters but we don't index them if (reconciled == update) @@ -153,13 +115,13 @@ public abstract class Cells updatePKIndexes(clustering, update, nowInSec, indexUpdater); indexUpdater.update(clustering, existing, reconciled); } - return Math.abs(existing.livenessInfo().timestamp() - update.livenessInfo().timestamp()); + return Math.abs(existing.timestamp() - update.timestamp()); } private static void updatePKIndexes(Clustering clustering, Cell cell, int nowInSec, SecondaryIndexManager.Updater indexUpdater) { if (indexUpdater != SecondaryIndexManager.nullUpdater && cell.isLive(nowInSec)) - indexUpdater.maybeIndex(clustering, cell.livenessInfo().timestamp(), cell.livenessInfo().ttl(), DeletionTime.LIVE); + indexUpdater.maybeIndex(clustering, cell.timestamp(), cell.ttl(), DeletionTime.LIVE); } /** @@ -190,10 +152,10 @@ public abstract class Cells if (c1.isCounterCell() || c2.isCounterCell()) { - Conflicts.Resolution res = Conflicts.resolveCounter(c1.livenessInfo().timestamp(), + Conflicts.Resolution res = Conflicts.resolveCounter(c1.timestamp(), c1.isLive(nowInSec), c1.value(), - c2.livenessInfo().timestamp(), + c2.timestamp(), c2.isLive(nowInSec), c2.value()); @@ -203,26 +165,26 @@ public abstract class Cells case RIGHT_WINS: return c2; default: ByteBuffer merged = Conflicts.mergeCounterValues(c1.value(), c2.value()); - LivenessInfo mergedInfo = c1.livenessInfo().mergeWith(c2.livenessInfo()); + long timestamp = Math.max(c1.timestamp(), c2.timestamp()); // We save allocating a new cell object if it turns out that one cell was // a complete superset of the other - if (merged == c1.value() && mergedInfo == c1.livenessInfo()) + if (merged == c1.value() && timestamp == c1.timestamp()) return c1; - else if (merged == c2.value() && mergedInfo == c2.livenessInfo()) + else if (merged == c2.value() && timestamp == c2.timestamp()) return c2; else // merge clocks and timestamps. - return create(c1.column(), true, merged, mergedInfo, null); + return new BufferCell(c1.column(), timestamp, Cell.NO_TTL, Cell.NO_DELETION_TIME, merged, c1.path()); } } - Conflicts.Resolution res = Conflicts.resolveRegular(c1.livenessInfo().timestamp(), + Conflicts.Resolution res = Conflicts.resolveRegular(c1.timestamp(), c1.isLive(nowInSec), - c1.livenessInfo().localDeletionTime(), + c1.localDeletionTime(), c1.value(), - c2.livenessInfo().timestamp(), + c2.timestamp(), c2.isLive(nowInSec), - c2.livenessInfo().localDeletionTime(), + c2.localDeletionTime(), c2.value()); assert res != Conflicts.Resolution.MERGE; return res == Conflicts.Resolution.LEFT_WINS ? c1 : c2; @@ -251,7 +213,7 @@ public abstract class Cells * {@code existing} to {@code writer}. * @param deletion the deletion time that applies to the cells being considered. * This deletion time may delete cells in both {@code existing} and {@code update}. - * @param writer the row writer to which the result of the reconciliation is written. + * @param builder the row build to which the result of the reconciliation is written. * @param nowInSec the current time in seconds (which plays a role during reconciliation * because deleted cells always have precedence on timestamp equality and deciding if a * cell is a live or not depends on the current time due to expiring cells). @@ -270,7 +232,7 @@ public abstract class Cells Iterator<Cell> existing, Iterator<Cell> update, DeletionTime deletion, - Row.Writer writer, + Row.Builder builder, int nowInSec, SecondaryIndexManager.Updater indexUpdater) { @@ -285,17 +247,17 @@ public abstract class Cells : comparator.compare(nextExisting.path(), nextUpdate.path())); if (cmp < 0) { - reconcile(clustering, nextExisting, null, deletion, writer, nowInSec, indexUpdater); + reconcile(clustering, nextExisting, null, deletion, builder, nowInSec, indexUpdater); nextExisting = getNext(existing); } else if (cmp > 0) { - reconcile(clustering, null, nextUpdate, deletion, writer, nowInSec, indexUpdater); + reconcile(clustering, null, nextUpdate, deletion, builder, nowInSec, indexUpdater); nextUpdate = getNext(update); } else { - timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, writer, nowInSec, indexUpdater)); + timeDelta = Math.min(timeDelta, reconcile(clustering, nextExisting, nextUpdate, deletion, builder, nowInSec, indexUpdater)); nextExisting = getNext(existing); nextUpdate = getNext(update); } @@ -307,65 +269,4 @@ public abstract class Cells { return iterator == null || !iterator.hasNext() ? null : iterator.next(); } - - /** - * Creates a simple cell. - * <p> - * Note that in general cell objects are created by the container they are in and so this method should - * only be used in a handful of cases when we know it's the right thing to do. - * - * @param column the column for the cell to create. - * @param isCounter whether the create cell should be a counter one. - * @param value the value for the cell. - * @param info the liveness info for the cell. - * @param path the cell path for the cell. - * @return the newly allocated cell object. - */ - public static Cell create(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - return new SimpleCell(column, isCounter, value, info, path); - } - - private static class SimpleCell extends AbstractCell - { - private final ColumnDefinition column; - private final boolean isCounter; - private final ByteBuffer value; - private final LivenessInfo info; - private final CellPath path; - - private SimpleCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - this.column = column; - this.isCounter = isCounter; - this.value = value; - this.info = info.takeAlias(); - this.path = path; - } - - public ColumnDefinition column() - { - return column; - } - - public boolean isCounterCell() - { - return isCounter; - } - - public ByteBuffer value() - { - return value; - } - - public LivenessInfo livenessInfo() - { - return info; - } - - public CellPath path() - { - return path; - } - } }
