http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/RowStats.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowStats.java b/src/java/org/apache/cassandra/db/rows/RowStats.java new file mode 100644 index 0000000..1bffdbe --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/RowStats.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataOutputPlus; + +import static org.apache.cassandra.db.LivenessInfo.NO_TIMESTAMP; +import static org.apache.cassandra.db.LivenessInfo.NO_TTL; +import static org.apache.cassandra.db.LivenessInfo.NO_DELETION_TIME; + +/** + * General statistics on rows (and and tombstones) for a given source. + * <p> + * Those stats are used to optimize the on-wire and on-disk storage of rows. More precisely, + * the {@code minTimestamp}, {@code minLocalDeletionTime} and {@code minTTL} stats are used to + * delta-encode those information for the sake of vint encoding. And {@code avgColumnSetPerRow} + * is used to decide if cells should be stored in a sparse or dense way (see {@link UnfilteredSerializer}). + * <p> + * Note that due to their use, those stats can suffer to be somewhat inaccurate (the more incurrate + * they are, the less effective the storage will be, but provided the stats are not completly wacky, + * this shouldn't have too huge an impact on performance) and in fact they will not always be + * accurate for reasons explained in {@link SerializationHeader#make}. + */ +public class RowStats +{ + // We should use this sparingly obviously + public static final RowStats NO_STATS = new RowStats(NO_TIMESTAMP, NO_DELETION_TIME, NO_TTL, -1); + + public static final Serializer serializer = new Serializer(); + + public final long minTimestamp; + public final int minLocalDeletionTime; + public final int minTTL; + + // Will be < 0 if the value is unknown + public final int avgColumnSetPerRow; + + public RowStats(long minTimestamp, + int minLocalDeletionTime, + int minTTL, + int avgColumnSetPerRow) + { + this.minTimestamp = minTimestamp; + this.minLocalDeletionTime = minLocalDeletionTime; + this.minTTL = minTTL; + this.avgColumnSetPerRow = avgColumnSetPerRow; + } + + public boolean hasMinTimestamp() + { + return minTimestamp != NO_TIMESTAMP; + } + + public boolean hasMinLocalDeletionTime() + { + return minLocalDeletionTime != NO_DELETION_TIME; + } + + /** + * Merge this stats with another one. + * <p> + * The comments of {@link SerializationHeader#make} applies here too, i.e. the result of + * merging will be not totally accurate but we can live with that. + */ + public RowStats mergeWith(RowStats that) + { + long minTimestamp = this.minTimestamp == NO_TIMESTAMP + ? that.minTimestamp + : (that.minTimestamp == NO_TIMESTAMP ? this.minTimestamp : Math.min(this.minTimestamp, that.minTimestamp)); + + int minDelTime = this.minLocalDeletionTime == NO_DELETION_TIME + ? that.minLocalDeletionTime + : (that.minLocalDeletionTime == NO_DELETION_TIME ? this.minLocalDeletionTime : Math.min(this.minLocalDeletionTime, that.minLocalDeletionTime)); + + int minTTL = this.minTTL == NO_TTL + ? that.minTTL + : (that.minTTL == NO_TTL ? this.minTTL : Math.min(this.minTTL, that.minTTL)); + + int avgColumnSetPerRow = this.avgColumnSetPerRow < 0 + ? that.avgColumnSetPerRow + : (that.avgColumnSetPerRow < 0 ? this.avgColumnSetPerRow : (this.avgColumnSetPerRow + that.avgColumnSetPerRow) / 2); + + return new RowStats(minTimestamp, minDelTime, minTTL, avgColumnSetPerRow); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RowStats rowStats = (RowStats) o; + + if (avgColumnSetPerRow != rowStats.avgColumnSetPerRow) return false; + if (minLocalDeletionTime != rowStats.minLocalDeletionTime) return false; + if (minTTL != rowStats.minTTL) return false; + if (minTimestamp != rowStats.minTimestamp) return false; + + return true; + } + + @Override + public int hashCode() + { + return Objects.hash(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow); + } + + @Override + public String toString() + { + return String.format("RowStats(ts=%d, ldt=%d, ttl=%d, avgColPerRow=%d)", minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow); + } + + public static class Collector + { + private boolean isTimestampSet; + private long minTimestamp = Long.MAX_VALUE; + + private boolean isDelTimeSet; + private int minDeletionTime = Integer.MAX_VALUE; + + private boolean isTTLSet; + private int minTTL = Integer.MAX_VALUE; + + private boolean isColumnSetPerRowSet; + private long totalColumnsSet; + private long rows; + + public void updateTimestamp(long timestamp) + { + if (timestamp == NO_TIMESTAMP) + return; + + isTimestampSet = true; + minTimestamp = Math.min(minTimestamp, timestamp); + } + + public void updateLocalDeletionTime(int deletionTime) + { + if (deletionTime == NO_DELETION_TIME) + return; + + isDelTimeSet = true; + minDeletionTime = Math.min(minDeletionTime, deletionTime); + } + + public void updateDeletionTime(DeletionTime deletionTime) + { + if (deletionTime.isLive()) + return; + + updateTimestamp(deletionTime.markedForDeleteAt()); + updateLocalDeletionTime(deletionTime.localDeletionTime()); + } + + public void updateTTL(int ttl) + { + if (ttl <= NO_TTL) + return; + + isTTLSet = true; + minTTL = Math.min(minTTL, ttl); + } + + public void updateColumnSetPerRow(int columnSetInRow) + { + updateColumnSetPerRow(columnSetInRow, 1); + } + + public void updateColumnSetPerRow(long totalColumnsSet, long rows) + { + if (totalColumnsSet < 0 || rows < 0) + return; + + this.isColumnSetPerRowSet = true; + this.totalColumnsSet += totalColumnsSet; + this.rows += rows; + } + + public RowStats get() + { + return new RowStats(isTimestampSet ? minTimestamp : NO_TIMESTAMP, + isDelTimeSet ? minDeletionTime : NO_DELETION_TIME, + isTTLSet ? minTTL : NO_TTL, + isColumnSetPerRowSet ? (rows == 0 ? 0 : (int)(totalColumnsSet / rows)) : -1); + } + } + + public static class Serializer + { + public void serialize(RowStats stats, DataOutputPlus out) throws IOException + { + out.writeLong(stats.minTimestamp); + out.writeInt(stats.minLocalDeletionTime); + out.writeInt(stats.minTTL); + out.writeInt(stats.avgColumnSetPerRow); + } + + public int serializedSize(RowStats stats, TypeSizes sizes) + { + return sizes.sizeof(stats.minTimestamp) + + sizes.sizeof(stats.minLocalDeletionTime) + + sizes.sizeof(stats.minTTL) + + sizes.sizeof(stats.avgColumnSetPerRow); + } + + public RowStats deserialize(DataInput in) throws IOException + { + long minTimestamp = in.readLong(); + int minLocalDeletionTime = in.readInt(); + int minTTL = in.readInt(); + int avgColumnSetPerRow = in.readInt(); + return new RowStats(minTimestamp, minLocalDeletionTime, minTTL, avgColumnSetPerRow); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java new file mode 100644 index 0000000..76dcf60 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.*; + +import com.google.common.collect.Iterators; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndexManager; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SearchIterator; + +/** + * Static utilities to work on Row objects. + */ +public abstract class Rows +{ + private static final Logger logger = LoggerFactory.getLogger(Rows.class); + + private Rows() {} + + public static final Row EMPTY_STATIC_ROW = new AbstractRow() + { + public Columns columns() + { + return Columns.NONE; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return LivenessInfo.NONE; + } + + public DeletionTime deletion() + { + return DeletionTime.LIVE; + } + + public boolean isEmpty() + { + return true; + } + + public boolean hasComplexDeletion() + { + return false; + } + + public Clustering clustering() + { + return Clustering.STATIC_CLUSTERING; + } + + public Cell getCell(ColumnDefinition c) + { + return null; + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + return null; + } + + public Iterator<Cell> getCells(ColumnDefinition c) + { + return null; + } + + public DeletionTime getDeletion(ColumnDefinition c) + { + return DeletionTime.LIVE; + } + + public Iterator<Cell> iterator() + { + return Iterators.<Cell>emptyIterator(); + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return new SearchIterator<ColumnDefinition, ColumnData>() + { + public boolean hasNext() + { + return false; + } + + public ColumnData next(ColumnDefinition column) + { + return null; + } + }; + } + + public Kind kind() + { + return Unfiltered.Kind.ROW; + } + + public Row takeAlias() + { + return this; + } + }; + + public interface SimpleMergeListener + { + public void onAdded(Cell newCell); + public void onRemoved(Cell removedCell); + public void onUpdated(Cell existingCell, Cell updatedCell); + } + + public static void writeClustering(Clustering clustering, Row.Writer writer) + { + for (int i = 0; i < clustering.size(); i++) + writer.writeClusteringValue(clustering.get(i)); + } + + public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec) + { + merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater); + } + + // Merge rows in memtable + // Return the minimum timestamp delta between existing and update + public static long merge(Row existing, + Row update, + Columns mergedColumns, + Row.Writer writer, + int nowInSec, + SecondaryIndexManager.Updater indexUpdater) + { + Clustering clustering = existing.clustering(); + writeClustering(clustering, writer); + + LivenessInfo existingInfo = existing.primaryKeyLivenessInfo(); + LivenessInfo updateInfo = update.primaryKeyLivenessInfo(); + LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo); + + long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp()); + + DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion(); + + if (deletion.deletes(mergedInfo)) + mergedInfo = LivenessInfo.NONE; + + writer.writePartitionKeyLivenessInfo(mergedInfo); + writer.writeRowDeletion(deletion); + + indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion); + + for (int i = 0; i < mergedColumns.simpleColumnCount(); i++) + { + ColumnDefinition c = mergedColumns.getSimple(i); + Cell existingCell = existing.getCell(c); + Cell updateCell = update.getCell(c); + timeDelta = Math.min(timeDelta, Cells.reconcile(clustering, + existingCell, + updateCell, + deletion, + writer, + nowInSec, + indexUpdater)); + } + + for (int i = 0; i < mergedColumns.complexColumnCount(); i++) + { + ColumnDefinition c = mergedColumns.getComplex(i); + DeletionTime existingDt = existing.getDeletion(c); + DeletionTime updateDt = update.getDeletion(c); + DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt; + if (maxDt.supersedes(deletion)) + writer.writeComplexDeletion(c, maxDt); + else + maxDt = deletion; + + Iterator<Cell> existingCells = existing.getCells(c); + Iterator<Cell> updateCells = update.getCells(c); + timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater)); + } + + writer.endOfRow(); + return timeDelta; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SerializationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java new file mode 100644 index 0000000..56b993c --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class SerializationHelper +{ + /** + * Flag affecting deserialization behavior (this only affect counters in practice). + * - LOCAL: for deserialization of local data (Expired columns are + * converted to tombstones (to gain disk space)). + * - FROM_REMOTE: for deserialization of data received from remote hosts + * (Expired columns are converted to tombstone and counters have + * their delta cleared) + * - PRESERVE_SIZE: used when no transformation must be performed, i.e, + * when we must ensure that deserializing and reserializing the + * result yield the exact same bytes. Streaming uses this. + */ + public static enum Flag + { + LOCAL, FROM_REMOTE, PRESERVE_SIZE; + } + + private final Flag flag; + public final int version; + + private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo(); + + // The currently read row liveness infos (timestamp, ttl and localDeletionTime). + private long rowTimestamp; + private int rowTTL; + private int rowLocalDeletionTime; + + private final ColumnFilter columnsToFetch; + private ColumnFilter.Tester tester; + + public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch) + { + this.flag = flag; + this.version = version; + this.columnsToFetch = columnsToFetch; + } + + public SerializationHelper(int version, Flag flag) + { + this(version, flag, null); + } + + public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime) + { + livenessInfo.setTo(timestamp, ttl, localDeletionTime); + writer.writePartitionKeyLivenessInfo(livenessInfo); + + rowTimestamp = timestamp; + rowTTL = ttl; + rowLocalDeletionTime = localDeletionTime; + } + + public long getRowTimestamp() + { + return rowTimestamp; + } + + public int getRowTTL() + { + return rowTTL; + } + + public int getRowLocalDeletionTime() + { + return rowLocalDeletionTime; + } + + public boolean includes(ColumnDefinition column) + { + return columnsToFetch == null || columnsToFetch.includes(column); + } + + public boolean canSkipValue(ColumnDefinition column) + { + return columnsToFetch != null && columnsToFetch.canSkipValue(column); + } + + public void startOfComplexColumn(ColumnDefinition column) + { + this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column); + } + + public void endOfComplexColumn(ColumnDefinition column) + { + this.tester = null; + } + + public void writeCell(Row.Writer writer, + ColumnDefinition column, + boolean isCounter, + ByteBuffer value, + long timestamp, + int localDelTime, + int ttl, + CellPath path) + { + livenessInfo.setTo(timestamp, ttl, localDelTime); + + if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))))) + value = CounterContext.instance().clearAllLocal(value); + + if (!column.isComplex() || tester == null || tester.includes(path)) + { + if (tester != null && tester.canSkipValue(path)) + value = ByteBufferUtil.EMPTY_BYTE_BUFFER; + writer.writeCell(column, isCounter, value, livenessInfo, path); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java new file mode 100644 index 0000000..08f37fd --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + +import com.google.common.collect.UnmodifiableIterator; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.utils.ObjectSizes; + +/** + * Holds cells data for the simple columns of one or more rows. + * <p> + * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and + * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for + * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'. + * <p> + * This does mean that we store cells in a "dense" way: if column doesn't have a cell for a + * given row, the correspond index in the cell data array will simple have a {@code null} value. + * We might want to switch to a more sparse encoding in the future but we keep it simple for + * now (having a sparse encoding make things a tad more complex because we need to be able to + * swap the cells for 2 given rows as seen in ComplexRowDataBlock). + */ +public class SimpleRowDataBlock +{ + private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false)); + + final Columns columns; + final CellData data; + + public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter) + { + this.columns = columns; + this.data = new CellData(rows * columns.simpleColumnCount(), isCounter); + } + + public Columns columns() + { + return columns; + } + + // Swap row i and j + public void swap(int i, int j) + { + int s = columns.simpleColumnCount(); + for (int k = 0; k < s; k++) + data.swapCell(i * s + k, j * s + k); + } + + // Merge row i into j + public void merge(int i, int j, int nowInSec) + { + int s = columns.simpleColumnCount(); + for (int k = 0; k < s; k++) + data.mergeCell(i * s + k, j * s + k, nowInSec); + } + + // Move row i into j + public void move(int i, int j) + { + int s = columns.simpleColumnCount(); + for (int k = 0; k < s; k++) + data.moveCell(i * s + k, j * s + k); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + data.unsharedHeapSizeExcludingData(); + } + + public int dataSize() + { + return data.dataSize(); + } + + public CellWriter cellWriter(boolean inOrderCells) + { + return new CellWriter(inOrderCells); + } + + public static CellData.ReusableCell reusableCell() + { + return new CellData.ReusableCell(); + } + + public static ReusableIterator reusableIterator() + { + return new ReusableIterator(); + } + + public void clear() + { + data.clear(); + } + + static class ReusableIterator extends UnmodifiableIterator<Cell> + { + private SimpleRowDataBlock dataBlock; + private final CellData.ReusableCell cell = new CellData.ReusableCell(); + + private int base; + private int column; + + private ReusableIterator() + { + } + + public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row) + { + this.dataBlock = dataBlock; + this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount(); + this.column = 0; + return this; + } + + public boolean hasNext() + { + if (dataBlock == null) + return false; + + int columnCount = dataBlock.columns.simpleColumnCount(); + // iterate over column until we find one with data + while (column < columnCount && !dataBlock.data.hasCell(base + column)) + ++column; + + return column < columnCount; + } + + public Cell next() + { + cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column); + ++column; + return cell; + } + } + + public class CellWriter + { + private final boolean inOrderCells; + + private int base; + private int lastColumnIdx; + + public CellWriter(boolean inOrderCells) + { + this.inOrderCells = inOrderCells; + } + + public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info) + { + int fromIdx = inOrderCells ? lastColumnIdx : 0; + lastColumnIdx = columns.simpleIdx(column, fromIdx); + assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx; + int idx = base + lastColumnIdx; + data.setCell(idx, value, info); + } + + public void reset() + { + base = 0; + lastColumnIdx = 0; + data.clear(); + } + + public void endOfRow() + { + base += columns.simpleColumnCount(); + lastColumnIdx = 0; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java new file mode 100644 index 0000000..2250ee9 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/SliceableUnfilteredRowIterator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.db.Slice; + +public interface SliceableUnfilteredRowIterator extends UnfilteredRowIterator +{ + /** + * Move forward (resp. backward if isReverseOrder() is true for the iterator) in + * the iterator and return an iterator over the Unfiltered selected by the provided + * {@code slice}. + * <p> + * Please note that successive calls to {@code slice} are allowed provided the + * slice are non overlapping and are passed in clustering (resp. reverse clustering) order. + * However, {@code slice} is allowed to leave the iterator in an unknown state and there + * is no guarantee over what a call to {@code hasNext} or {@code next} will yield after + * a call to {@code slice}. In other words, for a given iterator, you should either use + * {@code slice} or {@code hasNext/next} but not both. + */ + public Iterator<Unfiltered> slice(Slice slice); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/StaticRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java new file mode 100644 index 0000000..2ad9fb4 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/StaticRow.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import org.apache.cassandra.db.*; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.utils.SearchIterator; + +public class StaticRow extends AbstractRow +{ + private final DeletionTime deletion; + private final RowDataBlock data; + + private StaticRow(DeletionTime deletion, RowDataBlock data) + { + this.deletion = deletion.takeAlias(); + this.data = data; + } + + public Columns columns() + { + return data.columns(); + } + + public Cell getCell(ColumnDefinition c) + { + assert !c.isComplex(); + if (data.simpleData == null) + return null; + + int idx = columns().simpleIdx(c, 0); + if (idx < 0) + return null; + + return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx); + } + + public Cell getCell(ColumnDefinition c, CellPath path) + { + assert c.isComplex(); + + ComplexRowDataBlock dataBlock = data.complexData; + if (dataBlock == null) + return null; + + int idx = dataBlock.cellIdx(0, c, path); + if (idx < 0) + return null; + + return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx); + } + + public Iterator<Cell> getCells(ColumnDefinition c) + { + assert c.isComplex(); + return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c); + } + + public boolean hasComplexDeletion() + { + return data.hasComplexDeletion(0); + } + + public DeletionTime getDeletion(ColumnDefinition c) + { + assert c.isComplex(); + if (data.complexData == null) + return DeletionTime.LIVE; + + int idx = data.complexData.complexDeletionIdx(0, c); + return idx < 0 + ? DeletionTime.LIVE + : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx); + } + + public Iterator<Cell> iterator() + { + return RowDataBlock.reusableIterator().setTo(data, 0); + } + + public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + { + return new SearchIterator<ColumnDefinition, ColumnData>() + { + private int simpleIdx = 0; + + public boolean hasNext() + { + // TODO: we can do better, but we expect users to no rely on this anyway + return true; + } + + public ColumnData next(ColumnDefinition column) + { + if (column.isComplex()) + { + // TODO: this is sub-optimal + + Iterator<Cell> cells = getCells(column); + return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column)); + } + else + { + simpleIdx = columns().simpleIdx(column, simpleIdx); + assert simpleIdx >= 0; + + Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx); + ++simpleIdx; + return cell == null ? null : new ColumnData(column, cell, null, null); + } + } + }; + } + + public Row takeAlias() + { + return this; + } + + public Clustering clustering() + { + return Clustering.STATIC_CLUSTERING; + } + + public LivenessInfo primaryKeyLivenessInfo() + { + return LivenessInfo.NONE; + } + + public DeletionTime deletion() + { + return deletion; + } + + public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter) + { + return new Builder(columns, inOrderCells, isCounter); + } + + public static class Builder extends RowDataBlock.Writer + { + private final RowDataBlock data; + private DeletionTime deletion = DeletionTime.LIVE; + + public Builder(Columns columns, boolean inOrderCells, boolean isCounter) + { + super(inOrderCells); + this.data = new RowDataBlock(columns, 1, false, isCounter); + updateWriter(data); + } + + public void writeClusteringValue(ByteBuffer buffer) + { + throw new UnsupportedOperationException(); + } + + public void writePartitionKeyLivenessInfo(LivenessInfo info) + { + // Static rows are special and don't really have an existence unless they have live cells, + // so we shouldn't have any partition key liveness info. + assert info.equals(LivenessInfo.NONE); + } + + public void writeRowDeletion(DeletionTime deletion) + { + this.deletion = deletion; + } + + public StaticRow build() + { + return new StaticRow(deletion, data); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java new file mode 100644 index 0000000..a1c0ddc --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; + +public class TombstoneFilteringRow extends FilteringRow +{ + private final int nowInSec; + + public TombstoneFilteringRow(int nowInSec) + { + this.nowInSec = nowInSec; + } + + @Override + protected boolean include(DeletionTime dt) + { + return false; + } + + @Override + protected boolean include(Cell cell) + { + return cell.isLive(nowInSec); + } + + @Override + protected boolean include(ColumnDefinition c, DeletionTime dt) + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/Unfiltered.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java new file mode 100644 index 0000000..b1692e3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.security.MessageDigest; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.Clusterable; + +/** + * Unfiltered is the common class for the main constituent of an unfiltered partition. + * <p> + * In practice, an Unfiltered is either a row or a range tombstone marker. Unfiltereds + * are uniquely identified by their clustering information and can be sorted according + * to those. + */ +public interface Unfiltered extends Clusterable +{ + public enum Kind { ROW, RANGE_TOMBSTONE_MARKER }; + + /** + * The kind of the atom: either row or range tombstone marker. + */ + public Kind kind(); + + /** + * Digest the atom using the provided {@code MessageDigest}. + * + * @param digest the {@code MessageDigest} to use. + */ + public void digest(MessageDigest digest); + + /** + * Validate the data of this atom. + * + * @param metadata the metadata for the table this atom is part of. + * @throws MarshalException if some of the data in this atom is + * invalid (some value is invalid for its column type, or some field + * is nonsensical). + */ + public void validateData(CFMetaData metadata); + + public String toString(CFMetaData metadata); + public String toString(CFMetaData metadata, boolean fullDetails); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java new file mode 100644 index 0000000..a3ecf6d --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; + +/** + * An iterator over the rows of a given partition that also includes deletion informations. + * <p> + * An {@code UnfilteredRowIterator} contains a few partition top-level informations and is an + * iterator of {@code Unfiltered}, that is of either {@code Row} or {@code RangeTombstoneMarker}. + * An implementation of {@code UnfilteredRowIterator} <b>must</b> provide the following + * guarantees: + * 1. the returned {@code Unfiltered} must be in clustering order, or in reverse clustering + * order iff {@link #isReverseOrder} returns true. + * 2. the iterator should not shadow its own data. That is, no deletion + * (partition level deletion, row deletion, range tombstone, complex + * deletion) should delete anything else returned by the iterator (cell, row, ...). + * 3. every "start" range tombstone marker should have a corresponding "end" marker, and no other + * marker should be in-between this start-end pair of marker. Note that due to the + * previous rule this means that between a "start" and a corresponding "end" marker there + * can only be rows that are not deleted by the markers. Also note that when iterating + * in reverse order, "end" markers are returned before their "start" counterpart (i.e. + * "start" and "end" are always in the sense of the clustering order). + * + * Note further that the objects returned by next() are only valid until the + * next call to hasNext() or next(). If a consumer wants to keep a reference on + * the returned objects for longer than the iteration, it must make a copy of + * it explicitly. + */ +public interface UnfilteredRowIterator extends Iterator<Unfiltered>, AutoCloseable +{ + /** + * The metadata for the table this iterator on. + */ + public CFMetaData metadata(); + + /** + * A subset of the columns for the (static and regular) rows returned by this iterator. + * Every row returned by this iterator must guarantee that it has only those columns. + */ + public PartitionColumns columns(); + + /** + * Whether or not the atom returned by this iterator are in reversed + * clustering order. + */ + public boolean isReverseOrder(); + + /** + * The partition key of the partition this in an iterator over. + */ + public DecoratedKey partitionKey(); + + /** + * The partition level deletion for the partition this iterate over. + */ + public DeletionTime partitionLevelDeletion(); + + /** + * The static part corresponding to this partition (this can be an empty + * row). + */ + public Row staticRow(); + + /** + * Return "statistics" about what is returned by this iterator. Those are used for + * performance reasons (for delta-encoding for instance) and code should not + * expect those to be exact. + */ + public RowStats stats(); + + public void close(); + + /** + * Returns whether this iterator has no data (including no deletion data). + */ + public default boolean isEmpty() + { + return partitionLevelDeletion().isLive() + && staticRow().isEmpty() + && !hasNext(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java new file mode 100644 index 0000000..13c09d4 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.io.DataInput; +import java.io.IOException; +import java.io.IOError; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Serialize/Deserialize an unfiltered row iterator. + * + * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized + * until we read the end of the partition (see UnfilteredSerializer for details). The header itself + * is: + * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>] + * where: + * <cfid> is the table cfid. + * <key> is the partition key. + * <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost + * bit to leftmost one, the flags are: + * - is empty: whether the iterator is empty. If so, nothing follows the <flags> + * - is reversed: whether the iterator is in reversed clustering order + * - has partition deletion: whether or not there is a <partition_deletion> following + * - has static row: whether or not there is a <static_row> following + * - has row estimate: whether or not there is a <row_estimate> following + * <s_header> is the SerializationHeader. More precisely it's + * <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns> + * where: + * - <min_timestamp> is the base timestamp used for delta-encoding timestamps + * - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times + * - <min_ttl> is the base localDeletionTime used for delta-encoding ttls + * - <static_columns> is the static columns if a static row is present. It's + * the number of columns as an unsigned short, followed by the column names. + * - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>. + * <partition_deletion> is the deletion time for the partition (delta-encoded) + * <static_row> is the static row for this partition as serialized by UnfilteredSerializer. + * <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for + * the purpose of some sizing on the receiving end and should not be relied upon too strongly. + * + * !!! Please note that the serialized value depends on the schema and as such should not be used as is if + * it might be deserialized after the schema as changed !!! + * TODO: we should add a flag to include the relevant metadata in the header for commit log etc..... + */ +public class UnfilteredRowIteratorSerializer +{ + protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class); + + private static final int IS_EMPTY = 0x01; + private static final int IS_REVERSED = 0x02; + private static final int HAS_PARTITION_DELETION = 0x04; + private static final int HAS_STATIC_ROW = 0x08; + private static final int HAS_ROW_ESTIMATE = 0x10; + + public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer(); + + public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException + { + serialize(iterator, out, version, -1); + } + + public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException + { + SerializationHeader header = new SerializationHeader(iterator.metadata(), + iterator.columns(), + iterator.stats()); + serialize(iterator, out, header, version, rowEstimate); + } + + public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException + { + CFMetaData.serializer.serialize(iterator.metadata(), out, version); + ByteBufferUtil.writeWithLength(iterator.partitionKey().getKey(), out); + + int flags = 0; + if (iterator.isReverseOrder()) + flags |= IS_REVERSED; + + if (iterator.isEmpty()) + { + out.writeByte((byte)(flags | IS_EMPTY)); + return; + } + + DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); + if (!partitionDeletion.isLive()) + flags |= HAS_PARTITION_DELETION; + Row staticRow = iterator.staticRow(); + boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; + if (hasStatic) + flags |= HAS_STATIC_ROW; + + if (rowEstimate >= 0) + flags |= HAS_ROW_ESTIMATE; + + out.writeByte((byte)flags); + + SerializationHeader.serializer.serializeForMessaging(header, out, hasStatic); + + if (!partitionDeletion.isLive()) + writeDelTime(partitionDeletion, header, out); + + if (hasStatic) + UnfilteredSerializer.serializer.serialize(staticRow, header, out, version); + + if (rowEstimate >= 0) + out.writeInt(rowEstimate); + + while (iterator.hasNext()) + UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version); + UnfilteredSerializer.serializer.writeEndOfPartition(out); + } + + // Please note that this consume the iterator, and as such should not be called unless we have a simple way to + // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate + public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate, TypeSizes sizes) + { + SerializationHeader header = new SerializationHeader(iterator.metadata(), + iterator.columns(), + iterator.stats()); + + assert rowEstimate >= 0; + + long size = CFMetaData.serializer.serializedSize(iterator.metadata(), version, sizes) + + sizes.sizeofWithLength(iterator.partitionKey().getKey()) + + 1; // flags + + if (iterator.isEmpty()) + return size; + + DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); + Row staticRow = iterator.staticRow(); + boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; + + size += SerializationHeader.serializer.serializedSizeForMessaging(header, sizes, hasStatic); + + if (!partitionDeletion.isLive()) + size += delTimeSerializedSize(partitionDeletion, header, sizes); + + if (hasStatic) + size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version, sizes); + + if (rowEstimate >= 0) + size += sizes.sizeof(rowEstimate); + + while (iterator.hasNext()) + size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version, sizes); + size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(sizes); + + return size; + } + + public Header deserializeHeader(DataInput in, int version, SerializationHelper.Flag flag) throws IOException + { + CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); + DecoratedKey key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithLength(in)); + int flags = in.readUnsignedByte(); + boolean isReversed = (flags & IS_REVERSED) != 0; + if ((flags & IS_EMPTY) != 0) + { + SerializationHeader sh = new SerializationHeader(metadata, PartitionColumns.NONE, RowStats.NO_STATS); + return new Header(sh, metadata, key, isReversed, true, null, null, 0); + } + + boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0; + boolean hasStatic = (flags & HAS_STATIC_ROW) != 0; + boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0; + + SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, hasStatic); + + DeletionTime partitionDeletion = hasPartitionDeletion ? readDelTime(in, header) : DeletionTime.LIVE; + + Row staticRow = Rows.EMPTY_STATIC_ROW; + if (hasStatic) + staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag)); + + int rowEstimate = hasRowEstimate ? in.readInt() : -1; + return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate); + } + + public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException + { + while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null); + } + + public UnfilteredRowIterator deserialize(final DataInput in, int version, SerializationHelper.Flag flag) throws IOException + { + final Header h = deserializeHeader(in, version, flag); + + if (h.isEmpty) + return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed); + + final int clusteringSize = h.metadata.clusteringColumns().size(); + final SerializationHelper helper = new SerializationHelper(version, flag); + + return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats()) + { + private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter()); + private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize); + + protected Unfiltered computeNext() + { + try + { + Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset()); + if (kind == null) + return endOfData(); + + return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }; + } + + public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException + { + out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt())); + out.writeInt(header.encodeDeletionTime(dt.localDeletionTime())); + } + + public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header, TypeSizes sizes) + { + return sizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt())) + + sizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime())); + } + + public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException + { + long markedAt = header.decodeTimestamp(in.readLong()); + int localDelTime = header.decodeDeletionTime(in.readInt()); + return new SimpleDeletionTime(markedAt, localDelTime); + } + + public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException + { + // Note that since we might use VINT, we shouldn't assume the size of a long or an int + in.readLong(); + in.readInt(); + } + + public static class Header + { + public final SerializationHeader sHeader; + public final CFMetaData metadata; + public final DecoratedKey key; + public final boolean isReversed; + public final boolean isEmpty; + public final DeletionTime partitionDeletion; + public final Row staticRow; + public final int rowEstimate; // -1 if no estimate + + private Header(SerializationHeader sHeader, + CFMetaData metadata, + DecoratedKey key, + boolean isReversed, + boolean isEmpty, + DeletionTime partitionDeletion, + Row staticRow, + int rowEstimate) + { + this.sHeader = sHeader; + this.metadata = metadata; + this.key = key; + this.isReversed = isReversed; + this.isEmpty = isEmpty; + this.partitionDeletion = partitionDeletion; + this.staticRow = staticRow; + this.rowEstimate = rowEstimate; + } + + @Override + public String toString() + { + return String.format("{header=%s, table=%s.%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}", + sHeader, metadata.ksName, metadata.cfName, key, isReversed, isEmpty, partitionDeletion, staticRow.toString(metadata), rowEstimate); + } + } +}
