http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 545da7a..ad21c69 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -17,35 +17,31 @@ */ package org.apache.cassandra.db.rows; -import java.nio.ByteBuffer; import java.util.*; -import com.google.common.collect.Iterators; - import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.utils.MergeIterator; import org.apache.cassandra.utils.SearchIterator; /** * Storage engine representation of a row. * - * A row is identified by it's clustering column values (it's an Unfiltered), - * has row level informations (deletion and partition key liveness infos (see below)) - * and contains data (Cells) regarding the columns it contains. + * A row mainly contains the following informations: + * 1) Its {@code Clustering}, which holds the values for the clustering columns identifying the row. + * 2) Its row level informations: the primary key liveness infos and the row deletion (see + * {@link #primaryKeyLivenessInfo()} and {@link #deletion()} for more details). + * 3) Data for the columns it contains, or in other words, it's a (sorted) collection of + * {@code ColumnData}. * - * A row implements {@code WithLivenessInfo} and has thus a timestamp, ttl and - * local deletion time. Those information do not apply to the row content, they - * apply to the partition key columns. In other words, the timestamp is the - * timestamp for the partition key columns: it is what allows to distinguish - * between a dead row, and a live row but for which only the partition key columns - * are set. The ttl and local deletion time information are for the case where - * a TTL is set on those partition key columns. Note however that a row can have - * live cells but no partition key columns timestamp, because said timestamp (and - * its corresponding ttl) is only set on INSERT (not UPDATE). + * Also note that as for every other storage engine object, a {@code Row} object cannot shadow + * it's own data. For instance, a {@code Row} cannot contains a cell that is deleted by its own + * row deletion. */ -public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> +public interface Row extends Unfiltered, Iterable<ColumnData> { /** * The clustering values for this row. @@ -79,17 +75,14 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> * As a row is uniquely identified by its primary key, all its primary key columns * share the same {@code LivenessInfo}. This liveness information is what allows us * to distinguish between a dead row (it has no live cells and its primary key liveness - * info has no timestamp) and a live row but where all non PK columns are null (it has no - * live cells, but its primary key liveness has a timestamp). Please note that the ttl - * (and local deletion time) of the PK liveness information only apply to the - * liveness info timestamp, and not to the content of the row. Also note that because - * in practice there is not way to only delete the primary key columns (without deleting - * the row itself), the returned {@code LivenessInfo} can only have a local deletion time - * if it has a TTL. + * info is empty) and a live row but where all non PK columns are null (it has no + * live cells, but its primary key liveness is not empty). Please note that the liveness + * info (including it's eventually ttl/local deletion time) only apply to the primary key + * columns and has no impact on the row content. * <p> - * Lastly, note that it is possible for a row to have live cells but no PK liveness - * info timestamp, because said timestamp is only set on {@code INSERT} (which makes sense - * in itself, see #6782) but live cells can be add through {@code UPDATE} even if the row + * Note in particular that a row may have live cells but no PK liveness info, because the + * primary key liveness informations are only set on {@code INSERT} (which makes sense + * in itself, see #6782) but live cells can be added through {@code UPDATE} even if the row * wasn't pre-existing (which users are encouraged not to do, but we can't validate). */ public LivenessInfo primaryKeyLivenessInfo(); @@ -102,10 +95,10 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> public boolean isStatic(); /** - * Whether the row has no information whatsoever. This means no row infos - * (timestamp, ttl, deletion), no cells and no complex deletion info. + * Whether the row has no information whatsoever. This means no PK liveness info, no row + * deletion, no cells and no complex deletion info. * - * @return {@code true} if the row has no data whatsoever, {@code false} otherwise. + * @return {@code true} if the row has no data, {@code false} otherwise. */ public boolean isEmpty(); @@ -115,20 +108,8 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> public boolean hasLiveData(int nowInSec); /** - * Whether or not this row contains any deletion for a complex column. That is if - * there is at least one column for which {@code getDeletion} returns a non - * live deletion time. - */ - public boolean hasComplexDeletion(); - - /** * Returns a cell for a simple column. * - * Calls to this method are allowed to return the same Cell object, and hence the returned - * object is only valid until the next getCell/getCells call on the same Row object. You will need - * to copy the returned data if you plan on using a reference to the Cell object - * longer than that. - * * @param c the simple column for which to fetch the cell. * @return the corresponding cell or {@code null} if the row has no such cell. */ @@ -137,11 +118,6 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> /** * Return a cell for a given complex column and cell path. * - * Calls to this method are allowed to return the same Cell object, and hence the returned - * object is only valid until the next getCell/getCells call on the same Row object. You will need - * to copy the returned data if you plan on using a reference to the Cell object - * longer than that. - * * @param c the complex column for which to fetch the cell. * @param path the cell path for which to fetch the cell. * @return the corresponding cell or {@code null} if the row has no such cell. @@ -149,43 +125,35 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> public Cell getCell(ColumnDefinition c, CellPath path); /** - * Returns an iterator on the cells of a complex column c. - * - * Calls to this method are allowed to return the same iterator object, and - * hence the returned object is only valid until the next getCell/getCells call - * on the same Row object. You will need to copy the returned data if you - * plan on using a reference to the Cell object longer than that. + * The data for a complex column. + * <p> + * The returned object groups all the cells for the column, as well as it's complex deletion (if relevant). * - * @param c the complex column for which to fetch the cells. - * @return an iterator on the cells of complex column {@code c} or {@code null} if the row has no - * cells for that column. + * @param c the complex column for which to return the complex data. + * @return the data for {@code c} or {@code null} is the row has no data for this column. */ - public Iterator<Cell> getCells(ColumnDefinition c); + public ComplexColumnData getComplexColumnData(ColumnDefinition c); /** - * Deletion informations for complex columns. + * An iterable over the cells of this row. + * <p> + * The iterable guarantees that cells are returned in order of {@link Cell#comparator}. * - * @param c the complex column for which to fetch deletion info. - * @return the deletion time for complex column {@code c} in this row. + * @return an iterable over the cells of this row. */ - public DeletionTime getDeletion(ColumnDefinition c); + public Iterable<Cell> cells(); /** - * An iterator over the cells of this row. - * - * The iterator guarantees that for 2 rows of the same partition, columns - * are returned in a consistent order in the sense that if the cells for - * column c1 is returned before the cells for column c2 by the first iterator, - * it is also the case for the 2nd iterator. - * - * The object returned by a call to next() is only guaranteed to be valid until - * the next call to hasNext() or next(). If a consumer wants to keep a - * reference on the returned Cell objects for longer than the iteration, it must - * make a copy of it explicitly. + * Whether the row stores any (non-live) complex deletion for any complex column. + */ + public boolean hasComplexDeletion(); + + /** + * Whether the row has any deletion info (row deletion, cell tombstone, expired cell or complex deletion). * - * @return an iterator over the cells of this row. + * @param nowInSec the current time in seconds to decid if a cell is expired. */ - public Iterator<Cell> iterator(); + public boolean hasDeletion(int nowInSec); /** * An iterator to efficiently search data for a given column. @@ -195,134 +163,167 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> public SearchIterator<ColumnDefinition, ColumnData> searchIterator(); /** - * Copy this row to the provided writer. + * Returns a copy of this row that: + * 1) only includes the data for the column included by {@code filter}. + * 2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}). + */ + public Row filter(ColumnFilter filter, CFMetaData metadata); + + /** + * Returns a copy of this row that: + * 1) only includes the data for the column included by {@code filter}. + * 2) doesn't include any data that belongs to a dropped column (recorded in {@code metadata}). + * 3) doesn't include any data that is shadowed/deleted by {@code activeDeletion}. + * 4) uses {@code activeDeletion} as row deletion iff {@code setActiveDeletionToRow} and {@code activeDeletion} supersedes the row deletion. + */ + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata); + + /** + * Returns a copy of this row without any deletion info that should be purged according to {@code purger}. * - * @param writer the row writer to write this row to. + * @param purger the {@code DeletionPurger} to use to decide what can be purged. + * @param nowInSec the current time to decide what is deleted and what isn't (in the case of expired cells). + * @return this row but without any deletion info purged by {@code purger}. + */ + public Row purge(DeletionPurger purger, int nowInSec); + + /** + * Returns a copy of this row where all counter cells have they "local" shard marked for clearing. + */ + public Row markCounterLocalToBeCleared(); + + /** + * returns a copy of this row where all live timestamp have been replaced by {@code newTimestamp} and every deletion timestamp + * by {@code newTimestamp - 1}. See {@link Commit} for why we need this. */ - public void copyTo(Row.Writer writer); + public Row updateAllTimestamp(long newTimestamp); + + public int dataSize(); + + public long unsharedHeapSizeExcludingData(); public String toString(CFMetaData metadata, boolean fullDetails); /** - * Interface for writing a row. + * Interface for building rows. * <p> - * Clients of this interface should abid to the following assumptions: - * 1) if the row has a non empty clustering (it's not a static one and it doesn't belong to a table without - * clustering columns), then that clustering should be the first thing written (through - * {@link ClusteringPrefix.Writer#writeClusteringValue})). - * 2) for a given complex column, calls to {@link #writeCell} are performed consecutively (without - * any call to {@code writeCell} for another column intermingled) and in {@code CellPath} order. - * 3) {@link #endOfRow} is always called to end the writing of a given row. + * The builder of a row should always abid to the following rules: + * 1) {@link #newRow} is always called as the first thing for the row. + * 2) {@link #addPrimaryKeyLivenessInfo} and {@link #addRowDeletion}, if called, are called before + * any {@link #addCell}/{@link #addComplexDeletion} call. + * 3) {@link #build} is called to construct the new row. The builder can then be reused. + * + * There is 2 variants of a builder: sorted and unsorted ones. A sorted builder expects user to abid to the + * following additional rules: + * 4) Calls to {@link #addCell}/{@link #addComplexDeletion} are done in strictly increasing column order. + * In other words, all calls to these methods for a give column {@code c} are done after any call for + * any column before {@code c} and before any call for any column after {@code c}. + * 5) Calls to {@link #addCell} are further done in strictly increasing cell order (the one defined by + * {@link Cell#comparator}. That is, for a give column, cells are passed in {@code CellPath} order. + * + * An unsorted builder will not expect those last rules however: {@link #addCell} and {@link #addComplexDeletion} + * can be done in any order. And in particular unsorted builder allows multiple calls for the same column/cell. In + * that latter case, the result will follow the usual reconciliation rules (so equal cells are reconciled with + * {@link Cells#reconcile} and the "biggest" of multiple complex deletion for the same column wins). */ - public interface Writer extends ClusteringPrefix.Writer + public interface Builder { /** - * Writes the livness information for the partition key columns of this row. + * Whether the builder is a sorted one or not. + * + * @return if the builder requires calls to be done in sorted order or not (see above). + */ + public boolean isSorted(); + + /** + * Prepares the builder to build a new row of clustering {@code clustering}. + * <p> + * This should always be the first call for a given row. * - * This call is optional: skipping it is equivalent to calling {@code writePartitionKeyLivenessInfo(LivenessInfo.NONE)}. + * @param clustering the clustering for the new row. + */ + public void newRow(Clustering clustering); + + /** + * The clustering for the row that is currently being built. * - * @param info the liveness information for the partition key columns of the written row. + * @return the clustering for the row that is currently being built, or {@code null} if {@link #newRow} hasn't + * yet been called. */ - public void writePartitionKeyLivenessInfo(LivenessInfo info); + public Clustering clustering(); /** - * Writes the deletion information for this row. + * Adds the liveness information for the partition key columns of this row. + * + * This call is optional (skipping it is equivalent to calling {@code addPartitionKeyLivenessInfo(LivenessInfo.NONE)}). + * + * @param info the liveness information for the partition key columns of the built row. + */ + public void addPrimaryKeyLivenessInfo(LivenessInfo info); + + /** + * Adds the deletion information for this row. * * This call is optional and can be skipped if the row is not deleted. * * @param deletion the row deletion time, or {@code DeletionTime.LIVE} if the row isn't deleted. */ - public void writeRowDeletion(DeletionTime deletion); + public void addRowDeletion(DeletionTime deletion); /** - * Writes a cell to the writer. + * Adds a cell to this builder. * - * As mentionned above, add cells for a given column should be added consecutively (and in {@code CellPath} order for complex columns). - * - * @param column the column for the written cell. - * @param isCounter whether or not this is a counter cell. - * @param value the value for the cell. For tombstones, which don't have values, this should be an empty buffer. - * @param info the cell liveness information. - * @param path the {@link CellPath} for complex cells and {@code null} for regular cells. + * @param cell the cell to add. */ - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path); + public void addCell(Cell cell); /** - * Writes a deletion for a complex column, that is one that apply to all cells of the complex column. + * Adds a complex deletion. * - * @param column the (complex) column this is a deletion for. - * @param complexDeletion the deletion time. + * @param column the column for which to add the {@code complexDeletion}. + * @param complexDeletion the complex deletion time to add. */ - public void writeComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion); + public void addComplexDeletion(ColumnDefinition column, DeletionTime complexDeletion); /** - * Should be called to indicates that the row has been fully written. + * Builds and return built row. + * + * @return the last row built by this builder. */ - public void endOfRow(); + public Row build(); } /** * Utility class to help merging rows from multiple inputs (UnfilteredRowIterators). */ - public abstract static class Merger + public static class Merger { - private final CFMetaData metadata; - private final int nowInSec; - private final UnfilteredRowIterators.MergeListener listener; private final Columns columns; + private final Row[] rows; + private final List<Iterator<ColumnData>> columnDataIterators; private Clustering clustering; - private final Row[] rows; private int rowsToMerge; + private int lastRowSet = -1; - private LivenessInfo rowInfo = LivenessInfo.NONE; - private DeletionTime rowDeletion = DeletionTime.LIVE; - - private final Cell[] cells; - private final List<Iterator<Cell>> complexCells; - private final ComplexColumnReducer complexReducer = new ComplexColumnReducer(); + private final List<ColumnData> dataBuffer = new ArrayList<>(); + private final ColumnDataReducer columnDataReducer; - // For the sake of the listener if there is one - private final DeletionTime[] complexDelTimes; - - private boolean signaledListenerForRow; - - public static Merger createStatic(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) - { - return new StaticMerger(metadata, size, nowInSec, columns, listener); - } - - public static Merger createRegular(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + public Merger(int size, int nowInSec, Columns columns) { - return new RegularMerger(metadata, size, nowInSec, columns, listener); - } - - protected Merger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) - { - this.metadata = metadata; - this.nowInSec = nowInSec; - this.listener = listener; this.columns = columns; this.rows = new Row[size]; - this.complexCells = new ArrayList<>(size); - - this.cells = new Cell[size]; - this.complexDelTimes = listener == null ? null : new DeletionTime[size]; + this.columnDataIterators = new ArrayList<>(size); + this.columnDataReducer = new ColumnDataReducer(size, nowInSec, columns.hasComplex()); } public void clear() { + dataBuffer.clear(); Arrays.fill(rows, null); - Arrays.fill(cells, null); - if (complexDelTimes != null) - Arrays.fill(complexDelTimes, null); - complexCells.clear(); + columnDataIterators.clear(); rowsToMerge = 0; - - rowInfo = LivenessInfo.NONE; - rowDeletion = DeletionTime.LIVE; - - signaledListenerForRow = false; + lastRowSet = -1; } public void add(int i, Row row) @@ -330,225 +331,187 @@ public interface Row extends Unfiltered, Iterable<Cell>, Aliasable<Row> clustering = row.clustering(); rows[i] = row; ++rowsToMerge; + lastRowSet = i; } - protected abstract Row.Writer getWriter(); - protected abstract Row getRow(); - public Row merge(DeletionTime activeDeletion) { // If for this clustering we have only one row version and have no activeDeletion (i.e. nothing to filter out), - // then we can just return that single row (we also should have no listener) - if (rowsToMerge == 1 && activeDeletion.isLive() && listener == null) + // then we can just return that single row + if (rowsToMerge == 1 && activeDeletion.isLive()) { - for (int i = 0; i < rows.length; i++) - if (rows[i] != null) - return rows[i]; - throw new AssertionError(); + Row row = rows[lastRowSet]; + assert row != null; + return row; } - Row.Writer writer = getWriter(); - Rows.writeClustering(clustering, writer); - - for (int i = 0; i < rows.length; i++) + LivenessInfo rowInfo = LivenessInfo.EMPTY; + DeletionTime rowDeletion = DeletionTime.LIVE; + for (Row row : rows) { - if (rows[i] == null) + if (row == null) continue; - rowInfo = rowInfo.mergeWith(rows[i].primaryKeyLivenessInfo()); - - if (rows[i].deletion().supersedes(rowDeletion)) - rowDeletion = rows[i].deletion(); + if (row.primaryKeyLivenessInfo().supersedes(rowInfo)) + rowInfo = row.primaryKeyLivenessInfo(); + if (row.deletion().supersedes(rowDeletion)) + rowDeletion = row.deletion(); } - if (rowDeletion.supersedes(activeDeletion)) + if (activeDeletion.supersedes(rowDeletion)) + rowDeletion = DeletionTime.LIVE; + else activeDeletion = rowDeletion; if (activeDeletion.deletes(rowInfo)) - rowInfo = LivenessInfo.NONE; + rowInfo = LivenessInfo.EMPTY; - writer.writePartitionKeyLivenessInfo(rowInfo); - writer.writeRowDeletion(rowDeletion); + for (Row row : rows) + columnDataIterators.add(row == null ? Collections.emptyIterator() : row.iterator()); - for (int i = 0; i < columns.simpleColumnCount(); i++) + columnDataReducer.setActiveDeletion(activeDeletion); + Iterator<ColumnData> merged = MergeIterator.get(columnDataIterators, ColumnData.comparator, columnDataReducer); + while (merged.hasNext()) { - ColumnDefinition c = columns.getSimple(i); - for (int j = 0; j < rows.length; j++) - cells[j] = rows[j] == null ? null : rows[j].getCell(c); - - reconcileCells(activeDeletion, writer); + ColumnData data = merged.next(); + if (data != null) + dataBuffer.add(data); } - complexReducer.activeDeletion = activeDeletion; - complexReducer.writer = writer; - for (int i = 0; i < columns.complexColumnCount(); i++) - { - ColumnDefinition c = columns.getComplex(i); - - DeletionTime maxComplexDeletion = DeletionTime.LIVE; - for (int j = 0; j < rows.length; j++) - { - if (rows[j] == null) - continue; - - DeletionTime dt = rows[j].getDeletion(c); - if (complexDelTimes != null) - complexDelTimes[j] = dt; - - if (dt.supersedes(maxComplexDeletion)) - maxComplexDeletion = dt; - } - - boolean overrideActive = maxComplexDeletion.supersedes(activeDeletion); - maxComplexDeletion = overrideActive ? maxComplexDeletion : DeletionTime.LIVE; - writer.writeComplexDeletion(c, maxComplexDeletion); - if (listener != null) - listener.onMergedComplexDeletion(c, maxComplexDeletion, complexDelTimes); - - mergeComplex(overrideActive ? maxComplexDeletion : activeDeletion, c); - } - writer.endOfRow(); - - Row row = getRow(); - // Because shadowed cells are skipped, the row could be empty. In which case - // we return null (we also don't want to signal anything in that case since that - // means everything in the row was shadowed and the listener will have been signalled - // for whatever shadows it). - if (row.isEmpty()) - return null; - - maybeSignalEndOfRow(); - return row; + // Because some data might have been shadowed by the 'activeDeletion', we could have an empty row + return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty() + ? null + : ArrayBackedRow.create(clustering, columns, rowInfo, rowDeletion, dataBuffer.size(), dataBuffer.toArray(new ColumnData[dataBuffer.size()])); } - private void maybeSignalListenerForRow() + public Clustering mergedClustering() { - if (listener != null && !signaledListenerForRow) - { - listener.onMergingRows(clustering, rowInfo, rowDeletion, rows); - signaledListenerForRow = true; - } + return clustering; } - private void maybeSignalListenerForCell(Cell merged, Cell[] versions) + public Row[] mergedRows() { - if (listener != null) - { - maybeSignalListenerForRow(); - listener.onMergedCells(merged, versions); - } + return rows; } - private void maybeSignalEndOfRow() + private static class ColumnDataReducer extends MergeIterator.Reducer<ColumnData, ColumnData> { - if (listener != null) - { - // If we haven't signaled the listener yet (we had no cells but some deletion info), do it now - maybeSignalListenerForRow(); - listener.onRowDone(); - } - } + private final int nowInSec; - private void reconcileCells(DeletionTime activeDeletion, Row.Writer writer) - { - Cell reconciled = null; - for (int j = 0; j < cells.length; j++) - { - Cell cell = cells[j]; - if (cell != null && !activeDeletion.deletes(cell.livenessInfo())) - reconciled = Cells.reconcile(reconciled, cell, nowInSec); - } + private ColumnDefinition column; + private final List<ColumnData> versions; - if (reconciled != null) + private DeletionTime activeDeletion; + + private final ComplexColumnData.Builder complexBuilder; + private final List<Iterator<Cell>> complexCells; + private final CellReducer cellReducer; + + public ColumnDataReducer(int size, int nowInSec, boolean hasComplex) { - reconciled.writeTo(writer); - maybeSignalListenerForCell(reconciled, cells); + this.nowInSec = nowInSec; + this.versions = new ArrayList<>(size); + this.complexBuilder = hasComplex ? ComplexColumnData.builder() : null; + this.complexCells = hasComplex ? new ArrayList<>(size) : null; + this.cellReducer = new CellReducer(nowInSec); } - } - private void mergeComplex(DeletionTime activeDeletion, ColumnDefinition c) - { - complexCells.clear(); - for (int j = 0; j < rows.length; j++) + public void setActiveDeletion(DeletionTime activeDeletion) { - Row row = rows[j]; - Iterator<Cell> iter = row == null ? null : row.getCells(c); - complexCells.add(iter == null ? Iterators.<Cell>emptyIterator() : iter); + this.activeDeletion = activeDeletion; } - complexReducer.column = c; - complexReducer.activeDeletion = activeDeletion; - - // Note that we use the mergeIterator only to group cells to merge, but we - // write the result to the writer directly in the reducer, so all we care - // about is iterating over the result. - Iterator<Void> iter = MergeIterator.get(complexCells, c.cellComparator(), complexReducer); - while (iter.hasNext()) - iter.next(); - } - - private class ComplexColumnReducer extends MergeIterator.Reducer<Cell, Void> - { - private DeletionTime activeDeletion; - private Row.Writer writer; - private ColumnDefinition column; - - public void reduce(int idx, Cell current) + public void reduce(int idx, ColumnData data) { - cells[idx] = current; + column = data.column(); + versions.add(data); } - protected Void getReduced() + protected ColumnData getReduced() { - reconcileCells(activeDeletion, writer); - return null; + if (column.isSimple()) + { + Cell merged = null; + for (ColumnData data : versions) + { + Cell cell = (Cell)data; + if (!activeDeletion.deletes(cell)) + merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec); + } + return merged; + } + else + { + complexBuilder.newColumn(column); + complexCells.clear(); + DeletionTime complexDeletion = DeletionTime.LIVE; + for (ColumnData data : versions) + { + ComplexColumnData cd = (ComplexColumnData)data; + if (cd.complexDeletion().supersedes(complexDeletion)) + complexDeletion = cd.complexDeletion(); + complexCells.add(cd.iterator()); + } + + if (complexDeletion.supersedes(activeDeletion)) + { + cellReducer.setActiveDeletion(complexDeletion); + complexBuilder.addComplexDeletion(complexDeletion); + } + else + { + cellReducer.setActiveDeletion(activeDeletion); + } + + Iterator<Cell> cells = MergeIterator.get(complexCells, ColumnData.cellComparator, cellReducer); + while (cells.hasNext()) + { + Cell merged = cells.next(); + if (merged != null) + complexBuilder.addCell(merged); + } + return complexBuilder.build(); + } } protected void onKeyChange() { - Arrays.fill(cells, null); + versions.clear(); } } - private static class StaticMerger extends Merger + private static class CellReducer extends MergeIterator.Reducer<Cell, Cell> { - private final StaticRow.Builder builder; + private final int nowInSec; - private StaticMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) - { - super(metadata, size, nowInSec, columns, listener); - this.builder = StaticRow.builder(columns, true, metadata.isCounter()); - } + private DeletionTime activeDeletion; + private Cell merged; - protected Row.Writer getWriter() + public CellReducer(int nowInSec) { - return builder; + this.nowInSec = nowInSec; } - protected Row getRow() + public void setActiveDeletion(DeletionTime activeDeletion) { - return builder.build(); + this.activeDeletion = activeDeletion; + onKeyChange(); } - } - - private static class RegularMerger extends Merger - { - private final ReusableRow row; - private RegularMerger(CFMetaData metadata, int size, int nowInSec, Columns columns, UnfilteredRowIterators.MergeListener listener) + public void reduce(int idx, Cell cell) { - super(metadata, size, nowInSec, columns, listener); - this.row = new ReusableRow(metadata.clusteringColumns().size(), columns, true, metadata.isCounter()); + if (!activeDeletion.deletes(cell)) + merged = merged == null ? cell : Cells.reconcile(merged, cell, nowInSec); } - protected Row.Writer getWriter() + protected Cell getReduced() { - return row.writer(); + return merged; } - protected Row getRow() + protected void onKeyChange() { - return row; + merged = null; } } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java new file mode 100644 index 0000000..2a10199 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowAndDeletionMergeIterator.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Comparator; +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; + +/** + * An iterator that merges a source of rows with the range tombstone and partition level deletion of a give partition. + * <p> + * This is used by our {@code Partition} implementations to produce a {@code UnfilteredRowIterator} by merging the rows + * and deletion infos that are kept separate. This has also 2 additional role: + * 1) this make sure the row returned only includes the columns selected for the resulting iterator. + * 2) this (optionally) remove any data that can be shadowed (see commet on 'removeShadowedData' below for more details) + */ +public class RowAndDeletionMergeIterator extends AbstractUnfilteredRowIterator +{ + // For some of our Partition implementation, we can't guarantee that the deletion information (partition level + // deletion and range tombstones) don't shadow data in the rows. If that is the case, this class also take + // cares of skipping such shadowed data (since it is the contract of an UnfilteredRowIterator that it doesn't + // shadow its own data). Sometimes however, we know this can't happen, in which case we can skip that step. + private final boolean removeShadowedData; + private final Comparator<Clusterable> comparator; + private final ColumnFilter selection; + + private final Iterator<Row> rows; + private Row nextRow; + + private final Iterator<RangeTombstone> ranges; + private RangeTombstone nextRange; + + // The currently open tombstone. Note that unless this is null, there is no point in checking nextRange. + private RangeTombstone openRange; + + public RowAndDeletionMergeIterator(CFMetaData metadata, + DecoratedKey partitionKey, + DeletionTime partitionLevelDeletion, + ColumnFilter selection, + Row staticRow, + boolean isReversed, + RowStats stats, + Iterator<Row> rows, + Iterator<RangeTombstone> ranges, + boolean removeShadowedData) + { + super(metadata, partitionKey, partitionLevelDeletion, selection.fetchedColumns(), staticRow, isReversed, stats); + this.comparator = isReversed ? metadata.comparator.reversed() : metadata.comparator; + this.selection = selection; + this.removeShadowedData = removeShadowedData; + this.rows = rows; + this.ranges = ranges; + } + + protected Unfiltered computeNext() + { + while (true) + { + updateNextRow(); + if (nextRow == null) + { + if (openRange != null) + return closeOpenedRange(); + + updateNextRange(); + return nextRange == null ? endOfData() : openRange(); + } + + // We have a next row + + if (openRange == null) + { + // We have no currently open tombstone range. So check if we have a next range and if it sorts before this row. + // If it does, the opening of that range should go first. Otherwise, the row goes first. + updateNextRange(); + if (nextRange != null && comparator.compare(openBound(nextRange), nextRow.clustering()) < 0) + return openRange(); + + Row row = consumeNextRow(); + // it's possible for the row to be fully shadowed by the current range tombstone + if (row != null) + return row; + } + else + { + // We have both a next row and a currently opened tombstone. Check which goes first between the range closing and the row. + if (comparator.compare(closeBound(openRange), nextRow.clustering()) < 0) + return closeOpenedRange(); + + Row row = consumeNextRow(); + if (row != null) + return row; + } + } + } + + private void updateNextRow() + { + if (nextRow == null && rows.hasNext()) + nextRow = rows.next(); + } + + private void updateNextRange() + { + while (nextRange == null && ranges.hasNext()) + { + nextRange = ranges.next(); + if (removeShadowedData && partitionLevelDeletion().supersedes(nextRange.deletionTime())) + nextRange = null; + } + } + + private Row consumeNextRow() + { + Row row = nextRow; + nextRow = null; + if (!removeShadowedData) + return row.filter(selection, metadata()); + + DeletionTime activeDeletion = openRange == null ? partitionLevelDeletion() : openRange.deletionTime(); + return row.filter(selection, activeDeletion, false, metadata()); + } + + private RangeTombstone consumeNextRange() + { + RangeTombstone range = nextRange; + nextRange = null; + return range; + } + + private RangeTombstone consumeOpenRange() + { + RangeTombstone range = openRange; + openRange = null; + return range; + } + + private Slice.Bound openBound(RangeTombstone range) + { + return range.deletedSlice().open(isReverseOrder()); + } + + private Slice.Bound closeBound(RangeTombstone range) + { + return range.deletedSlice().close(isReverseOrder()); + } + + private RangeTombstoneMarker closeOpenedRange() + { + // Check if that close if actually a boundary between markers + updateNextRange(); + RangeTombstoneMarker marker; + if (nextRange != null && comparator.compare(closeBound(openRange), openBound(nextRange)) == 0) + { + marker = RangeTombstoneBoundaryMarker.makeBoundary(isReverseOrder(), closeBound(openRange), openBound(nextRange), openRange.deletionTime(), nextRange.deletionTime()); + openRange = consumeNextRange(); + } + else + { + RangeTombstone toClose = consumeOpenRange(); + marker = new RangeTombstoneBoundMarker(closeBound(toClose), toClose.deletionTime()); + } + return marker; + } + + private RangeTombstoneMarker openRange() + { + assert openRange == null && nextRange != null; + openRange = consumeNextRange(); + return new RangeTombstoneBoundMarker(openBound(openRange), openRange.deletionTime()); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java b/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java deleted file mode 100644 index 3d204d3..0000000 --- a/src/java/org/apache/cassandra/db/rows/RowAndTombstoneMergeIterator.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.util.Comparator; -import java.util.Iterator; - -import com.google.common.collect.PeekingIterator; -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.db.*; - -public class RowAndTombstoneMergeIterator extends UnmodifiableIterator<Unfiltered> implements PeekingIterator<Unfiltered> -{ - private final Comparator<Clusterable> comparator; - private final boolean reversed; - - private Iterator<Row> rowIter; - private Row nextRow; - - private Iterator<RangeTombstone> tombstoneIter; - private RangeTombstone nextTombstone; - private boolean inTombstone; - - private Unfiltered next; - - public RowAndTombstoneMergeIterator(ClusteringComparator comparator, boolean reversed) - { - this.comparator = reversed ? comparator.reversed() : comparator; - this.reversed = reversed; - } - - public RowAndTombstoneMergeIterator setTo(Iterator<Row> rowIter, Iterator<RangeTombstone> tombstoneIter) - { - this.rowIter = rowIter; - this.tombstoneIter = tombstoneIter; - this.nextRow = null; - this.nextTombstone = null; - this.next = null; - this.inTombstone = false; - return this; - } - - public boolean isSet() - { - return rowIter != null; - } - - private void prepareNext() - { - if (next != null) - return; - - if (nextTombstone == null && tombstoneIter.hasNext()) - nextTombstone = tombstoneIter.next(); - if (nextRow == null && rowIter.hasNext()) - nextRow = rowIter.next(); - - if (nextTombstone == null) - { - if (nextRow == null) - return; - - next = nextRow; - nextRow = null; - } - else if (nextRow == null) - { - if (inTombstone) - { - RangeTombstone rt = nextTombstone; - nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null; - // An end and a start makes a boundary if they sort similarly - if (nextTombstone != null - && comparator.compare(rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)) == 0) - { - next = RangeTombstoneBoundaryMarker.makeBoundary(reversed, - rt.deletedSlice().close(reversed), - nextTombstone.deletedSlice().open(reversed), - rt.deletionTime(), - nextTombstone.deletionTime()); - } - else - { - inTombstone = false; - next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime()); - } - } - else - { - inTombstone = true; - next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime()); - } - } - else if (inTombstone) - { - if (comparator.compare(nextTombstone.deletedSlice().close(reversed), nextRow.clustering()) < 0) - { - RangeTombstone rt = nextTombstone; - nextTombstone = tombstoneIter.hasNext() ? tombstoneIter.next() : null; - if (nextTombstone != null - && comparator.compare(rt.deletedSlice().close(reversed), nextTombstone.deletedSlice().open(reversed)) == 0) - { - next = RangeTombstoneBoundaryMarker.makeBoundary(reversed, - rt.deletedSlice().close(reversed), - nextTombstone.deletedSlice().open(reversed), - rt.deletionTime(), - nextTombstone.deletionTime()); - } - else - { - inTombstone = false; - next = new RangeTombstoneBoundMarker(rt.deletedSlice().close(reversed), rt.deletionTime()); - } - } - else - { - next = nextRow; - nextRow = null; - } - } - else - { - if (comparator.compare(nextTombstone.deletedSlice().open(reversed), nextRow.clustering()) < 0) - { - inTombstone = true; - next = new RangeTombstoneBoundMarker(nextTombstone.deletedSlice().open(reversed), nextTombstone.deletionTime()); - } - else - { - next = nextRow; - nextRow = null; - } - } - } - - public boolean hasNext() - { - prepareNext(); - return next != null; - } - - public Unfiltered next() - { - prepareNext(); - Unfiltered toReturn = next; - next = null; - return toReturn; - } - - public Unfiltered peek() - { - prepareNext(); - return next(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowDataBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java b/src/java/org/apache/cassandra/db/rows/RowDataBlock.java deleted file mode 100644 index b1e2b13..0000000 --- a/src/java/org/apache/cassandra/db/rows/RowDataBlock.java +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.nio.ByteBuffer; -import java.util.*; - -import com.google.common.collect.UnmodifiableIterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.ObjectSizes; - -/** - * A {@code RowDataBlock} holds data for one or more row (of a given table). More precisely, it contains - * cell data and complex deletion data (for complex columns) and allow access to this data. Please note - * however that {@code RowDataBlock} only holds the data inside the row, it does not hold the data - * pertaining to the row itself: clustering, partition key liveness info and row deletion. - * <p> - * {@code RowDataBlock} is largely an implementation detail: it is only there to be reused by - * {@link AbstractPartitionData} and every concrete row implementation. - */ -public class RowDataBlock -{ - private static final Logger logger = LoggerFactory.getLogger(RowDataBlock.class); - - private static final long EMPTY_SIZE = ObjectSizes.measure(new RowDataBlock(Columns.NONE, 0, false, false)); - - // We distinguish 2 sub-objects: SimpleRowDataBlock that contains the data for the simple columns only, - // and ComplexRowDataBlock that only contains data for complex columns. The reason for having 2 separate - // objects is that simple columns are much easier to handle since we have only a single cell per-object - // and thus having a more specialized object allow a simpler and more efficient handling. - final SimpleRowDataBlock simpleData; - final ComplexRowDataBlock complexData; - - public RowDataBlock(Columns columns, int rows, boolean sortable, boolean isCounter) - { - this.simpleData = columns.hasSimple() ? new SimpleRowDataBlock(columns, rows, isCounter) : null; - this.complexData = columns.hasComplex() ? ComplexRowDataBlock.create(columns, rows, sortable, isCounter) : null; - } - - public Columns columns() - { - if (simpleData != null) - return simpleData.columns(); - if (complexData != null) - return complexData.columns(); - return Columns.NONE; - } - - /** - * Return the cell value for a given column of a given row. - * - * @param row the row for which to return the cell value. - * @param column the column for which to return the cell value. - * @param path the cell path for which to return the cell value. Can be null for - * simple columns. - * - * @return the value of the cell of path {@code path} for {@code column} in row {@code row}, or - * {@code null} if their is no such cell. - */ - public ByteBuffer getValue(int row, ColumnDefinition column, CellPath path) - { - if (column.isComplex()) - { - return complexData.getValue(row, column, path); - } - else - { - int idx = columns().simpleIdx(column, 0); - assert idx >= 0; - return simpleData.data.value((row * columns().simpleColumnCount()) + idx); - } - } - - /** - * Sets the cell value for a given simple column of a given row. - * - * @param row the row for which to set the cell value. - * @param column the simple column for which to set the cell value. - * @param path the cell path for which to return the cell value. Can be null for - * simple columns. - * @param value the value to set. - */ - public void setValue(int row, ColumnDefinition column, CellPath path, ByteBuffer value) - { - if (column.isComplex()) - { - complexData.setValue(row, column, path, value); - } - else - { - int idx = columns().simpleIdx(column, 0); - assert idx >= 0; - simpleData.data.setValue((row * columns().simpleColumnCount()) + idx, value); - } - } - - public static ReusableIterator reusableIterator() - { - return new ReusableIterator(); - } - - // Swap row i and j - public void swap(int i, int j) - { - if (simpleData != null) - simpleData.swap(i, j); - if (complexData != null) - complexData.swap(i, j); - } - - // Merge row i into j - public void merge(int i, int j, int nowInSec) - { - if (simpleData != null) - simpleData.merge(i, j, nowInSec); - if (complexData != null) - complexData.merge(i, j, nowInSec); - } - - // Move row i into j - public void move(int i, int j) - { - if (simpleData != null) - simpleData.move(i, j); - if (complexData != null) - complexData.move(i, j); - } - - public boolean hasComplexDeletion(int row) - { - return complexData != null && complexData.hasComplexDeletion(row); - } - - public long unsharedHeapSizeExcludingData() - { - return EMPTY_SIZE - + (simpleData == null ? 0 : simpleData.unsharedHeapSizeExcludingData()) - + (complexData == null ? 0 : complexData.unsharedHeapSizeExcludingData()); - } - - public static int computeNewCapacity(int currentCapacity, int idxToSet) - { - int newCapacity = currentCapacity == 0 ? 4 : currentCapacity; - while (idxToSet >= newCapacity) - newCapacity = 1 + (newCapacity * 3) / 2; - return newCapacity; - } - - public int dataSize() - { - return (simpleData == null ? 0 : simpleData.dataSize()) - + (complexData == null ? 0 : complexData.dataSize()); - } - - public void clear() - { - if (simpleData != null) - simpleData.clear(); - if (complexData != null) - complexData.clear(); - } - - public abstract static class Writer implements Row.Writer - { - private final boolean inOrderCells; - - protected int row; - - protected SimpleRowDataBlock.CellWriter simpleWriter; - protected ComplexRowDataBlock.CellWriter complexWriter; - - protected Writer(boolean inOrderCells) - { - this.inOrderCells = inOrderCells; - } - - protected Writer(RowDataBlock data, boolean inOrderCells) - { - this(inOrderCells); - updateWriter(data); - } - - protected void updateWriter(RowDataBlock data) - { - this.simpleWriter = data.simpleData == null ? null : data.simpleData.cellWriter(inOrderCells); - this.complexWriter = data.complexData == null ? null : data.complexData.cellWriter(inOrderCells); - } - - public Writer reset() - { - row = 0; - - if (simpleWriter != null) - simpleWriter.reset(); - if (complexWriter != null) - complexWriter.reset(); - - return this; - } - - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - if (column.isComplex()) - complexWriter.addCell(column, value, info, path); - else - simpleWriter.addCell(column, value, info); - } - - public void writeComplexDeletion(ColumnDefinition c, DeletionTime complexDeletion) - { - if (complexDeletion.isLive()) - return; - - complexWriter.setComplexDeletion(c, complexDeletion); - } - - public void endOfRow() - { - ++row; - if (simpleWriter != null) - simpleWriter.endOfRow(); - if (complexWriter != null) - complexWriter.endOfRow(); - } - } - - static class ReusableIterator extends UnmodifiableIterator<Cell> implements Iterator<Cell> - { - private SimpleRowDataBlock.ReusableIterator simpleIterator; - private ComplexRowDataBlock.ReusableIterator complexIterator; - - public ReusableIterator() - { - this.simpleIterator = SimpleRowDataBlock.reusableIterator(); - this.complexIterator = ComplexRowDataBlock.reusableIterator(); - } - - public ReusableIterator setTo(RowDataBlock dataBlock, int row) - { - simpleIterator.setTo(dataBlock.simpleData, row); - complexIterator.setTo(dataBlock.complexData, row); - return this; - } - - public boolean hasNext() - { - return simpleIterator.hasNext() || complexIterator.hasNext(); - } - - public Cell next() - { - return simpleIterator.hasNext() ? simpleIterator.next() : complexIterator.next(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowDiffListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowDiffListener.java b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java new file mode 100644 index 0000000..50d6d32 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowDiffListener.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; + +/** + * Interface that allows to act on the result of merging multiple rows. + * + * More precisely, given N rows and the result of merging them, one can call {@link Rows#diff()} + * with a {@code RowDiffListener} and that listener will be informed for each input row of the diff between + * that input and merge row. + */ +public interface RowDiffListener +{ + /** + * Called for the row primary key liveness info of input {@code i}. + * + * @param i the input row from which {@code original} is from. + * @param clustering the clustering for the row that is merged. + * @param merged the primary key liveness info of the merged row. Will be {@code null} if input {@code i} had + * a {@code LivenessInfo}, but the merged result don't (i.e. the original info has been shadowed/deleted). + * @param original the primary key liveness info of input {@code i}. May be {@code null} if input {@code i} + * has not primary key liveness info (i.e. it has {@code LivenessInfo.NONE}) but the merged result has. + */ + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original); + + /** + * Called for the row deletion of input {@code i}. + * + * @param i the input row from which {@code original} is from. + * @param clustering the clustering for the row that is merged. + * @param merged the deletion of the merged row. Will be {@code null} if input {@code i} had deletion + * but the merged result doesn't (i.e. the deletion has been shadowed). + * @param original the deletion of input {@code i}. May be {@code null} if input {@code i} had no deletion but the merged row has. + */ + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original); + + /** + * Called for every (non-live) complex deletion of any complex column present in either the merged row of input {@code i}. + * + * @param i the input row from which {@code original} is from. + * @param clustering the clustering for the row that is merged. + * @param column the column for which this is a complex deletion of. + * @param merged the complex deletion of the merged row. Will be {@code null} if input {@code i} had a complex deletion + * for {@code column} but the merged result doesn't (i.e. the deletion has been shadowed). + * @param original the complex deletion of input {@code i} for column {@code column}. May be {@code null} if input {@code i} + * had no complex deletion but the merged row has. + */ + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original); + + /** + * Called for any cell that is either in the merged row or in input {@code i}. + * + * @param i the input row from which {@code original} is from. + * @param clustering the clustering for the row that is merged. + * @param merged the cell of the merged row. Will be {@code null} if input {@code i} had a cell but that cell is no present + * in the mergd result (it has been deleted/shadowed). + * @param original the cell of input {@code i}. May be {@code null} if input {@code i} had cell corresponding to {@code merged}. + */ + public void onCell(int i, Clustering clustering, Cell merged, Cell original); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index a3bd913..766cf19 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.utils.FBUtilities; /** @@ -37,19 +36,6 @@ public abstract class RowIterators private RowIterators() {} - public static PartitionUpdate toUpdate(RowIterator iterator) - { - PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1); - - if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) - iterator.staticRow().copyTo(update.staticWriter()); - - while (iterator.hasNext()) - iterator.next().copyTo(update.writer()); - - return update; - } - public static void digest(RowIterator iterator, MessageDigest digest) { // TODO: we're not computing digest the same way that old nodes so we'll need @@ -123,11 +109,11 @@ public abstract class RowIterators { CFMetaData metadata = iterator.metadata(); logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}", - new Object[]{ id, - metadata.ksName, - metadata.cfName, - metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), - iterator.isReverseOrder() }); + id, + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), + iterator.isReverseOrder()); return new WrappingRowIterator(iterator) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/RowStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java index c672490..5b0d3bd 100644 --- a/src/java/org/apache/cassandra/db/rows/RowStats.java +++ b/src/java/org/apache/cassandra/db/rows/RowStats.java @@ -17,17 +17,17 @@ */ package org.apache.cassandra.db.rows; -import java.io.DataInput; import java.io.IOException; import java.util.Objects; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP; import static org.apache.cassandra.db.LivenessInfo.NO_TTL; -import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME; +import static org.apache.cassandra.db.LivenessInfo.NO_EXPIRATION_TIME; /** * General statistics on rows (and and tombstones) for a given source. @@ -45,7 +45,7 @@ import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME; public class RowStats { // We should use this sparingly obviously - public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_DELETION_TIME, NO_TTL, -1); + public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_EXPIRATION_TIME, NO_TTL, -1); public static final Serializer serializer = new Serializer(); @@ -74,7 +74,7 @@ public class RowStats public boolean hasMinLocalDeletionTime() { - return minLocalDeletionTime != NO_DELETION_TIME; + return minLocalDeletionTime != NO_EXPIRATION_TIME; } /** @@ -89,9 +89,9 @@ public class RowStats ? that.minTimestamp : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp)); - int minDelTime = this.minLocalDeletionTime == NO_DELETION_TIME + int minDelTime = this.minLocalDeletionTime == NO_EXPIRATION_TIME ? that.minLocalDeletionTime - : (that.minLocalDeletionTime == NO_DELETION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime)); + : (that.minLocalDeletionTime == NO_EXPIRATION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime)); int minTTL = this.minTTL == NO_TTL ? that.minTTL @@ -132,7 +132,7 @@ public class RowStats return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow); } - public static class Collector + public static class Collector implements PartitionStatisticsCollector { private boolean isTimestampSet; private long minTimestamp = Long.MAX_VALUE; @@ -147,6 +147,27 @@ public class RowStats private long totalColumnsSet; private long rows; + public void update(LivenessInfo info) + { + if (info.isEmpty()) + return; + + updateTimestamp(info.timestamp()); + + if (info.isExpiring()) + { + updateTTL(info.ttl()); + updateLocalDeletionTime(info.localExpirationTime()); + } + } + + public void update(Cell cell) + { + updateTimestamp(cell.timestamp()); + updateTTL(cell.ttl()); + updateLocalDeletionTime(cell.localDeletionTime()); + } + public void updateTimestamp(long timestamp) { if (timestamp == NO_TIMESTAMP) @@ -158,14 +179,14 @@ public class RowStats public void updateLocalDeletionTime(int deletionTime) { - if (deletionTime == NO_DELETION_TIME) + if (deletionTime == NO_EXPIRATION_TIME) return; isDelTimeSet = true; minDeletionTime = Math.min(minDeletionTime, deletionTime); } - public void updateDeletionTime(DeletionTime deletionTime) + public void update(DeletionTime deletionTime) { if (deletionTime.isLive()) return; @@ -183,7 +204,7 @@ public class RowStats minTTL = Math.min(minTTL, ttl); } - public void updateColumnSetPerRow(int columnSetInRow) + public void updateColumnSetPerRow(long columnSetInRow) { updateColumnSetPerRow(columnSetInRow, 1); } @@ -198,12 +219,17 @@ public class RowStats this.rows += rows; } + public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards) + { + // We don't care about this but this come with PartitionStatisticsCollector + } + public RowStats get() { return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP, - isDelTimeSet ? minDeletionTime : NO_DELETION_TIME, - isTTLSet ? minTTL : NO_TTL, - isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1); + isDelTimeSet ? minDeletionTime : NO_EXPIRATION_TIME, + isTTLSet ? minTTL : NO_TTL, + isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1); } } @@ -211,26 +237,26 @@ public class RowStats { public void serialize(RowStats stats, DataOutputPlus out) throws IOException { - out.writeLong(stats.minTimestamp); - out.writeInt(stats.minLocalDeletionTime); - out.writeInt(stats.minTTL); - out.writeInt(stats.avgColumnSetPerRow); + out.writeVInt(stats.minTimestamp); + out.writeVInt(stats.minLocalDeletionTime); + out.writeVInt(stats.minTTL); + out.writeVInt(stats.avgColumnSetPerRow); } public int serializedSize(RowStats stats) { - return TypeSizes.sizeof(stats.minTimestamp) - + TypeSizes.sizeof(stats.minLocalDeletionTime) - + TypeSizes.sizeof(stats.minTTL) - + TypeSizes.sizeof(stats.avgColumnSetPerRow); + return TypeSizes.sizeofVInt(stats.minTimestamp) + + TypeSizes.sizeofVInt(stats.minLocalDeletionTime) + + TypeSizes.sizeofVInt(stats.minTTL) + + TypeSizes.sizeofVInt(stats.avgColumnSetPerRow); } - public RowStats deserialize(DataInput in) throws IOException + public RowStats deserialize(DataInputPlus in) throws IOException { - long minTimestamp = in.readLong(); - int minLocalDeletionTime = in.readInt(); - int minTTL = in.readInt(); - int avgColumnSetPerRow = in.readInt(); + long minTimestamp = in.readVInt(); + int minLocalDeletionTime = (int)in.readVInt(); + int minTTL = (int)in.readVInt(); + int avgColumnSetPerRow = (int)in.readVInt(); return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow); } }
