http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 30923c5..4072f8d 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -17,32 +17,21 @@ */ package org.apache.cassandra.db.rows; -import java.io.DataInput; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.utils.SearchIterator; /** - * Serialize/deserialize a single Unfiltered for the intra-node protocol. + * Serialize/deserialize a single Unfiltered (both on-wire and on-disk). * - * The encode format for an unfiltered is <flags>(<row>|<marker>) where: + * The encoded format for an unfiltered is <flags>(<row>|<marker>) where: * - * <flags> is a byte whose bits are flags. The rightmost 1st bit is only - * set to indicate the end of the partition. The 2nd bit indicates - * whether the reminder is a range tombstone marker (otherwise it's a row). - * If it's a row then the 3rd bit indicates if it's static, the 4th bit - * indicates the presence of a row timestamp, the 5th the presence of a row - * ttl, the 6th the presence of row deletion and the 7th indicates the - * presence of complex deletion times. + * <flags> is a byte whose bits are flags used by the rest of the serialization. Each + * flag is defined/explained below as the "Unfiltered flags" constants. * <row> is <clustering>[<timestamp>][<ttl>][<deletion>]<sc1>...<sci><cc1>...<ccj> where * <clustering> is the row clustering as serialized by * {@code Clustering.serializer}. Note that static row are an exception and @@ -50,7 +39,7 @@ import org.apache.cassandra.utils.SearchIterator; * whose presence is determined by the flags. <sci> is the simple columns of the row and <ccj> the * complex ones. There is actually 2 slightly different possible layout for those * cell: a dense one and a sparse one. Which one is used depends on the serialization - * header and more precisely of {@link SerializationHeader.useSparseColumnLayout()}: + * header and more precisely of {@link SerializationHeader#useSparseColumnLayout(boolean)}: * 1) in the dense layout, there will be as many <sci> and <ccj> as there is columns * in the serialization header. *Each simple column <sci> will simply be a <cell> * (which might have no value, see below), while each <ccj> will be @@ -84,27 +73,18 @@ import org.apache.cassandra.utils.SearchIterator; */ public class UnfilteredSerializer { - private static final Logger logger = LoggerFactory.getLogger(UnfilteredSerializer.class); - public static final UnfilteredSerializer serializer = new UnfilteredSerializer(); - // Unfiltered flags - private final static int END_OF_PARTITION = 0x01; - private final static int IS_MARKER = 0x02; - // For rows - private final static int IS_STATIC = 0x04; - private final static int HAS_TIMESTAMP = 0x08; - private final static int HAS_TTL = 0x10; - private final static int HAS_DELETION = 0x20; - private final static int HAS_COMPLEX_DELETION = 0x40; - - // Cell flags - private final static int PRESENCE_MASK = 0x01; - private final static int DELETION_MASK = 0x02; - private final static int EXPIRATION_MASK = 0x04; - private final static int EMPTY_VALUE_MASK = 0x08; - private final static int USE_ROW_TIMESTAMP = 0x10; - private final static int USE_ROW_TTL = 0x20; + /* + * Unfiltered flags constants. + */ + private final static int END_OF_PARTITION = 0x01; // Signal the end of the partition. Nothing follows a <flags> field with that flag. + private final static int IS_MARKER = 0x02; // Whether the encoded unfiltered is a marker or a row. All following markers applies only to rows. + private final static int IS_STATIC = 0x04; // Whether the encoded row is a static. + private final static int HAS_TIMESTAMP = 0x08; // Whether the encoded row has a timestamp (i.e. if row.partitionKeyLivenessInfo().hasTimestamp() == true). + private final static int HAS_TTL = 0x10; // Whether the encoded row has some expiration info (i.e. if row.partitionKeyLivenessInfo().hasTTL() == true). + private final static int HAS_DELETION = 0x20; // Whether the encoded row has some deletion info. + private final static int HAS_COMPLEX_DELETION = 0x40; // Whether the encoded row has some complex deletion for at least one of its columns. public void serialize(Unfiltered unfiltered, SerializationHeader header, DataOutputPlus out, int version) throws IOException @@ -131,9 +111,9 @@ public class UnfilteredSerializer if (isStatic) flags |= IS_STATIC; - if (pkLiveness.hasTimestamp()) + if (!pkLiveness.isEmpty()) flags |= HAS_TIMESTAMP; - if (pkLiveness.hasTTL()) + if (pkLiveness.isExpiring()) flags |= HAS_TTL; if (!deletion.isLive()) flags |= HAS_DELETION; @@ -149,7 +129,7 @@ public class UnfilteredSerializer if ((flags & HAS_TTL) != 0) { out.writeInt(header.encodeTTL(pkLiveness.ttl())); - out.writeInt(header.encodeDeletionTime(pkLiveness.localDeletionTime())); + out.writeInt(header.encodeDeletionTime(pkLiveness.localExpirationTime())); } if ((flags & HAS_DELETION) != 0) UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out); @@ -160,52 +140,49 @@ public class UnfilteredSerializer SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator(); for (int i = 0; i < simpleCount; i++) - writeSimpleColumn(i, cells.next(columns.getSimple(i)), header, out, pkLiveness, useSparse); + writeSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, out, useSparse); for (int i = simpleCount; i < columns.columnCount(); i++) - writeComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, out, pkLiveness, useSparse); + writeComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, out, useSparse); if (useSparse) out.writeShort(-1); } - private void writeSimpleColumn(int idx, ColumnData data, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse) + private void writeSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse) throws IOException { if (useSparse) { - if (data == null) + if (cell == null) return; out.writeShort(idx); } - - writeCell(data == null ? null : data.cell(), header, out, rowLiveness); + Cell.serializer.serialize(cell, out, rowLiveness, header); } - private void writeComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness, boolean useSparse) + private void writeComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out, boolean useSparse) throws IOException { - Iterator<Cell> cells = data == null ? null : data.cells(); - DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion(); - if (useSparse) { - assert hasComplexDeletion || deletion.isLive(); - if (cells == null && deletion.isLive()) + if (data == null) return; out.writeShort(idx); } if (hasComplexDeletion) - UnfilteredRowIteratorSerializer.writeDelTime(deletion, header, out); + UnfilteredRowIteratorSerializer.writeDelTime(data == null ? DeletionTime.LIVE : data.complexDeletion(), header, out); - if (cells != null) - while (cells.hasNext()) - writeCell(cells.next(), header, out, rowLiveness); + if (data != null) + { + for (Cell cell : data) + Cell.serializer.serialize(cell, out, rowLiveness, header); + } - writeCell(null, header, out, rowLiveness); + Cell.serializer.serialize(null, out, rowLiveness, header); } public void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, int version) @@ -245,12 +222,12 @@ public class UnfilteredSerializer if (!isStatic) size += Clustering.serializer.serializedSize(row.clustering(), version, header.clusteringTypes()); - if (pkLiveness.hasTimestamp()) + if (!pkLiveness.isEmpty()) size += TypeSizes.sizeof(header.encodeTimestamp(pkLiveness.timestamp())); - if (pkLiveness.hasTTL()) + if (pkLiveness.isExpiring()) { size += TypeSizes.sizeof(header.encodeTTL(pkLiveness.ttl())); - size += TypeSizes.sizeof(header.encodeDeletionTime(pkLiveness.localDeletionTime())); + size += TypeSizes.sizeof(header.encodeDeletionTime(pkLiveness.localExpirationTime())); } if (!deletion.isLive()) size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header); @@ -261,10 +238,10 @@ public class UnfilteredSerializer SearchIterator<ColumnDefinition, ColumnData> cells = row.searchIterator(); for (int i = 0; i < simpleCount; i++) - size += sizeOfSimpleColumn(i, cells.next(columns.getSimple(i)), header, pkLiveness, useSparse); + size += sizeOfSimpleColumn(i, (Cell)cells.next(columns.getSimple(i)), pkLiveness, header, useSparse); for (int i = simpleCount; i < columns.columnCount(); i++) - size += sizeOfComplexColumn(i, cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, header, pkLiveness, useSparse); + size += sizeOfComplexColumn(i, (ComplexColumnData)cells.next(columns.getComplex(i - simpleCount)), hasComplexDeletion, pkLiveness, header, useSparse); if (useSparse) size += TypeSizes.sizeof((short)-1); @@ -272,41 +249,40 @@ public class UnfilteredSerializer return size; } - private long sizeOfSimpleColumn(int idx, ColumnData data, SerializationHeader header, LivenessInfo rowLiveness, boolean useSparse) + private long sizeOfSimpleColumn(int idx, Cell cell, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse) { long size = 0; if (useSparse) { - if (data == null) + if (cell == null) return size; size += TypeSizes.sizeof((short)idx); } - return size + sizeOfCell(data == null ? null : data.cell(), header, rowLiveness); + return size + Cell.serializer.serializedSize(cell, rowLiveness, header); } - private long sizeOfComplexColumn(int idx, ColumnData data, boolean hasComplexDeletion, SerializationHeader header, LivenessInfo rowLiveness, boolean useSparse) + private long sizeOfComplexColumn(int idx, ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, boolean useSparse) { long size = 0; - Iterator<Cell> cells = data == null ? null : data.cells(); - DeletionTime deletion = data == null ? DeletionTime.LIVE : data.complexDeletion(); if (useSparse) { - assert hasComplexDeletion || deletion.isLive(); - if (cells == null && deletion.isLive()) + if (data == null) return size; size += TypeSizes.sizeof((short)idx); } if (hasComplexDeletion) - size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(deletion, header); + size += UnfilteredRowIteratorSerializer.delTimeSerializedSize(data == null ? DeletionTime.LIVE : data.complexDeletion(), header); - if (cells != null) - while (cells.hasNext()) - size += sizeOfCell(cells.next(), header, rowLiveness); + if (data != null) + { + for (Cell cell : data) + size += Cell.serializer.serializedSize(cell, rowLiveness, header); + } - return size + sizeOfCell(null, header, rowLiveness); + return size + Cell.serializer.serializedSize(null, rowLiveness, header); } public long serializedSize(RangeTombstoneMarker marker, SerializationHeader header, int version) @@ -337,155 +313,157 @@ public class UnfilteredSerializer return 1; } - public Unfiltered.Kind deserialize(DataInput in, - SerializationHeader header, - SerializationHelper helper, - Row.Writer rowWriter, - RangeTombstoneMarker.Writer markerWriter) + public Unfiltered deserialize(DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder) throws IOException { + // It wouldn't be wrong per-se to use an unsorted builder, but it would be inefficient so make sure we don't do it by mistake + assert builder.isSorted(); + int flags = in.readUnsignedByte(); if (isEndOfPartition(flags)) return null; if (kind(flags) == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) { - RangeTombstone.Bound.Kind kind = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes(), markerWriter); - deserializeMarkerBody(in, header, kind.isBoundary(), markerWriter); - return Unfiltered.Kind.RANGE_TOMBSTONE_MARKER; + RangeTombstone.Bound bound = RangeTombstone.Bound.serializer.deserialize(in, helper.version, header.clusteringTypes()); + return deserializeMarkerBody(in, header, bound); } else { assert !isStatic(flags); // deserializeStaticRow should be used for that. - Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes(), rowWriter); - deserializeRowBody(in, header, helper, flags, rowWriter); - return Unfiltered.Kind.ROW; + builder.newRow(Clustering.serializer.deserialize(in, helper.version, header.clusteringTypes())); + return deserializeRowBody(in, header, helper, flags, builder); } } - public Row deserializeStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) + public Row deserializeStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException { int flags = in.readUnsignedByte(); - assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags); - StaticRow.Builder builder = StaticRow.builder(header.columns().statics, true, header.columns().statics.hasCounters()); - deserializeRowBody(in, header, helper, flags, builder); - return builder.build(); - } - - public void skipStaticRow(DataInput in, SerializationHeader header, SerializationHelper helper) throws IOException - { - int flags = in.readUnsignedByte(); - assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags; - skipRowBody(in, header, helper, flags); + assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : flags; + Row.Builder builder = ArrayBackedRow.sortedBuilder(helper.fetchedStaticColumns(header)); + builder.newRow(Clustering.STATIC_CLUSTERING); + return deserializeRowBody(in, header, helper, flags, builder); } - public void deserializeMarkerBody(DataInput in, - SerializationHeader header, - boolean isBoundary, - RangeTombstoneMarker.Writer writer) + public RangeTombstoneMarker deserializeMarkerBody(DataInputPlus in, SerializationHeader header, RangeTombstone.Bound bound) throws IOException { - if (isBoundary) - writer.writeBoundaryDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header)); - else - writer.writeBoundDeletion(UnfilteredRowIteratorSerializer.readDelTime(in, header)); - writer.endOfMarker(); - } - - public void skipMarkerBody(DataInput in, SerializationHeader header, boolean isBoundary) throws IOException - { - if (isBoundary) - { - UnfilteredRowIteratorSerializer.skipDelTime(in, header); - UnfilteredRowIteratorSerializer.skipDelTime(in, header); - } + if (bound.isBoundary()) + return new RangeTombstoneBoundaryMarker(bound, UnfilteredRowIteratorSerializer.readDelTime(in, header), UnfilteredRowIteratorSerializer.readDelTime(in, header)); else - { - UnfilteredRowIteratorSerializer.skipDelTime(in, header); - } + return new RangeTombstoneBoundMarker(bound, UnfilteredRowIteratorSerializer.readDelTime(in, header)); } - public void deserializeRowBody(DataInput in, - SerializationHeader header, - SerializationHelper helper, - int flags, - Row.Writer writer) + public Row deserializeRowBody(DataInputPlus in, + SerializationHeader header, + SerializationHelper helper, + int flags, + Row.Builder builder) throws IOException { - boolean isStatic = isStatic(flags); - boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; - boolean hasTTL = (flags & HAS_TTL) != 0; - boolean hasDeletion = (flags & HAS_DELETION) != 0; - boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0; - - long timestamp = hasTimestamp ? header.decodeTimestamp(in.readLong()) : LivenessInfo.NO_TIMESTAMP; - int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL; - int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME; - DeletionTime deletion = hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE; + try + { + boolean isStatic = isStatic(flags); + boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; + boolean hasTTL = (flags & HAS_TTL) != 0; + boolean hasDeletion = (flags & HAS_DELETION) != 0; + boolean hasComplexDeletion = (flags & HAS_COMPLEX_DELETION) != 0; + + LivenessInfo rowLiveness = LivenessInfo.EMPTY; + if (hasTimestamp) + { + long timestamp = header.decodeTimestamp(in.readLong()); + int ttl = hasTTL ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL; + int localDeletionTime = hasTTL ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_EXPIRATION_TIME; + rowLiveness = LivenessInfo.create(timestamp, ttl, localDeletionTime); + } - helper.writePartitionKeyLivenessInfo(writer, timestamp, ttl, localDeletionTime); - writer.writeRowDeletion(deletion); + builder.addPrimaryKeyLivenessInfo(rowLiveness); + builder.addRowDeletion(hasDeletion ? UnfilteredRowIteratorSerializer.readDelTime(in, header) : DeletionTime.LIVE); - Columns columns = header.columns(isStatic); - if (header.useSparseColumnLayout(isStatic)) - { - int count = columns.columnCount(); - int simpleCount = columns.simpleColumnCount(); - int i; - while ((i = in.readShort()) >= 0) + Columns columns = header.columns(isStatic); + if (header.useSparseColumnLayout(isStatic)) { - if (i > count) - throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); + int count = columns.columnCount(); + int simpleCount = columns.simpleColumnCount(); + int i; + while ((i = in.readShort()) >= 0) + { + if (i > count) + throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); + + if (i < simpleCount) + readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness); + else + readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, builder, rowLiveness); + } + } + else + { + for (int i = 0; i < columns.simpleColumnCount(); i++) + readSimpleColumn(columns.getSimple(i), in, header, helper, builder, rowLiveness); - if (i < simpleCount) - readSimpleColumn(columns.getSimple(i), in, header, helper, writer); - else - readComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion, writer); + for (int i = 0; i < columns.complexColumnCount(); i++) + readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, builder, rowLiveness); } + + return builder.build(); } - else + catch (RuntimeException | AssertionError e) { - for (int i = 0; i < columns.simpleColumnCount(); i++) - readSimpleColumn(columns.getSimple(i), in, header, helper, writer); - - for (int i = 0; i < columns.complexColumnCount(); i++) - readComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion, writer); + // Corrupted data could be such that it triggers an assertion in the row Builder, or break one of its assumption. + // Of course, a bug in said builder could also trigger this, but it's impossible a priori to always make the distinction + // between a real bug and data corrupted in just the bad way. Besides, re-throwing as an IOException doesn't hide the + // exception, it just make we catch it properly and mark the sstable as corrupted. + throw new IOException("Error building row with data deserialized from " + in, e); } - - writer.endOfRow(); } - private void readSimpleColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer) + private void readSimpleColumn(ColumnDefinition column, DataInputPlus in, SerializationHeader header, SerializationHelper helper, Row.Builder builder, LivenessInfo rowLiveness) throws IOException { if (helper.includes(column)) - readCell(column, in, header, helper, writer); + { + Cell cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper); + if (cell != null && !helper.isDropped(cell, false)) + builder.addCell(cell); + } else - skipCell(column, in, header); + { + Cell.serializer.skip(in, column, header); + } } - private void readComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Writer writer) + private void readComplexColumn(ColumnDefinition column, DataInputPlus in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness) throws IOException { if (helper.includes(column)) { helper.startOfComplexColumn(column); - if (hasComplexDeletion) - writer.writeComplexDeletion(column, UnfilteredRowIteratorSerializer.readDelTime(in, header)); + { + DeletionTime complexDeletion = UnfilteredRowIteratorSerializer.readDelTime(in, header); + if (!helper.isDroppedComplexDeletion(complexDeletion)) + builder.addComplexDeletion(column, complexDeletion); + } - while (readCell(column, in, header, helper, writer)); + Cell cell; + while ((cell = Cell.serializer.deserialize(in, rowLiveness, column, header, helper)) != null) + { + if (helper.includes(cell.path()) && !helper.isDropped(cell, true)) + builder.addCell(cell); + } - helper.endOfComplexColumn(column); + helper.endOfComplexColumn(); } else { - skipComplexColumn(column, in, header, helper, hasComplexDeletion); + skipComplexColumn(in, column, header, hasComplexDeletion); } } - public void skipRowBody(DataInput in, SerializationHeader header, SerializationHelper helper, int flags) throws IOException + public void skipRowBody(DataInputPlus in, SerializationHeader header, int flags) throws IOException { boolean isStatic = isStatic(flags); boolean hasTimestamp = (flags & HAS_TIMESTAMP) != 0; @@ -518,28 +496,48 @@ public class UnfilteredSerializer throw new IOException(String.format("Impossible column index %d, the header has only %d columns defined", i, count)); if (i < simpleCount) - skipCell(columns.getSimple(i), in, header); + Cell.serializer.skip(in, columns.getSimple(i), header); else - skipComplexColumn(columns.getComplex(i - simpleCount), in, header, helper, hasComplexDeletion); + skipComplexColumn(in, columns.getComplex(i - simpleCount), header, hasComplexDeletion); } } else { for (int i = 0; i < columns.simpleColumnCount(); i++) - skipCell(columns.getSimple(i), in, header); + Cell.serializer.skip(in, columns.getSimple(i), header); for (int i = 0; i < columns.complexColumnCount(); i++) - skipComplexColumn(columns.getComplex(i), in, header, helper, hasComplexDeletion); + skipComplexColumn(in, columns.getComplex(i), header, hasComplexDeletion); + } + } + + public void skipStaticRow(DataInputPlus in, SerializationHeader header, SerializationHelper helper) throws IOException + { + int flags = in.readUnsignedByte(); + assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW && isStatic(flags) : "Flags is " + flags; + skipRowBody(in, header, flags); + } + + public void skipMarkerBody(DataInputPlus in, SerializationHeader header, boolean isBoundary) throws IOException + { + if (isBoundary) + { + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + UnfilteredRowIteratorSerializer.skipDelTime(in, header); + } + else + { + UnfilteredRowIteratorSerializer.skipDelTime(in, header); } } - private void skipComplexColumn(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, boolean hasComplexDeletion) + private void skipComplexColumn(DataInputPlus in, ColumnDefinition column, SerializationHeader header, boolean hasComplexDeletion) throws IOException { if (hasComplexDeletion) UnfilteredRowIteratorSerializer.skipDelTime(in, header); - while (skipCell(column, in, header)); + while (Cell.serializer.skip(in, column, header)); } public static boolean isEndOfPartition(int flags) @@ -556,151 +554,4 @@ public class UnfilteredSerializer { return (flags & IS_MARKER) == 0 && (flags & IS_STATIC) != 0; } - - private void writeCell(Cell cell, SerializationHeader header, DataOutputPlus out, LivenessInfo rowLiveness) - throws IOException - { - if (cell == null) - { - out.writeByte((byte)0); - return; - } - - boolean hasValue = cell.value().hasRemaining(); - boolean isDeleted = cell.isTombstone(); - boolean isExpiring = cell.isExpiring(); - boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp(); - boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime(); - int flags = PRESENCE_MASK; - if (!hasValue) - flags |= EMPTY_VALUE_MASK; - - if (isDeleted) - flags |= DELETION_MASK; - else if (isExpiring) - flags |= EXPIRATION_MASK; - - if (useRowTimestamp) - flags |= USE_ROW_TIMESTAMP; - if (useRowTTL) - flags |= USE_ROW_TTL; - - out.writeByte((byte)flags); - - if (hasValue) - header.getType(cell.column()).writeValue(cell.value(), out); - - if (!useRowTimestamp) - out.writeLong(header.encodeTimestamp(cell.livenessInfo().timestamp())); - - if ((isDeleted || isExpiring) && !useRowTTL) - out.writeInt(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime())); - if (isExpiring && !useRowTTL) - out.writeInt(header.encodeTTL(cell.livenessInfo().ttl())); - - if (cell.column().isComplex()) - cell.column().cellPathSerializer().serialize(cell.path(), out); - } - - private long sizeOfCell(Cell cell, SerializationHeader header, LivenessInfo rowLiveness) - { - long size = 1; // flags - - if (cell == null) - return size; - - boolean hasValue = cell.value().hasRemaining(); - boolean isDeleted = cell.isTombstone(); - boolean isExpiring = cell.isExpiring(); - boolean useRowTimestamp = rowLiveness.hasTimestamp() && cell.livenessInfo().timestamp() == rowLiveness.timestamp(); - boolean useRowTTL = isExpiring && rowLiveness.hasTTL() && cell.livenessInfo().ttl() == rowLiveness.ttl() && cell.livenessInfo().localDeletionTime() == rowLiveness.localDeletionTime(); - - if (hasValue) - size += header.getType(cell.column()).writtenLength(cell.value()); - - if (!useRowTimestamp) - size += TypeSizes.sizeof(header.encodeTimestamp(cell.livenessInfo().timestamp())); - - if ((isDeleted || isExpiring) && !useRowTTL) - size += TypeSizes.sizeof(header.encodeDeletionTime(cell.livenessInfo().localDeletionTime())); - if (isExpiring && !useRowTTL) - size += TypeSizes.sizeof(header.encodeTTL(cell.livenessInfo().ttl())); - - if (cell.column().isComplex()) - size += cell.column().cellPathSerializer().serializedSize(cell.path()); - - return size; - } - - private boolean readCell(ColumnDefinition column, DataInput in, SerializationHeader header, SerializationHelper helper, Row.Writer writer) - throws IOException - { - int flags = in.readUnsignedByte(); - if ((flags & PRESENCE_MASK) == 0) - return false; - - boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0; - boolean isDeleted = (flags & DELETION_MASK) != 0; - boolean isExpiring = (flags & EXPIRATION_MASK) != 0; - boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0; - boolean useRowTTL = (flags & USE_ROW_TTL) != 0; - - ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER; - if (hasValue) - { - if (helper.canSkipValue(column)) - header.getType(column).skipValue(in); - else - value = header.getType(column).readValue(in); - } - - long timestamp = useRowTimestamp ? helper.getRowTimestamp() : header.decodeTimestamp(in.readLong()); - - int localDelTime = useRowTTL - ? helper.getRowLocalDeletionTime() - : (isDeleted || isExpiring ? header.decodeDeletionTime(in.readInt()) : LivenessInfo.NO_DELETION_TIME); - - int ttl = useRowTTL - ? helper.getRowTTL() - : (isExpiring ? header.decodeTTL(in.readInt()) : LivenessInfo.NO_TTL); - - CellPath path = column.isComplex() - ? column.cellPathSerializer().deserialize(in) - : null; - - helper.writeCell(writer, column, false, value, timestamp, localDelTime, ttl, path); - - return true; - } - - private boolean skipCell(ColumnDefinition column, DataInput in, SerializationHeader header) - throws IOException - { - int flags = in.readUnsignedByte(); - if ((flags & PRESENCE_MASK) == 0) - return false; - - boolean hasValue = (flags & EMPTY_VALUE_MASK) == 0; - boolean isDeleted = (flags & DELETION_MASK) != 0; - boolean isExpiring = (flags & EXPIRATION_MASK) != 0; - boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP) != 0; - boolean useRowTTL = (flags & USE_ROW_TTL) != 0; - - if (hasValue) - header.getType(column).skipValue(in); - - if (!useRowTimestamp) - in.readLong(); - - if (!useRowTTL && (isDeleted || isExpiring)) - in.readInt(); - - if (!useRowTTL && isExpiring) - in.readInt(); - - if (column.isComplex()) - column.cellPathSerializer().skip(in); - - return true; - } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/WrappingRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRow.java b/src/java/org/apache/cassandra/db/rows/WrappingRow.java deleted file mode 100644 index 5a0cc78..0000000 --- a/src/java/org/apache/cassandra/db/rows/WrappingRow.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.util.Collections; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.SearchIterator; - -public abstract class WrappingRow extends AbstractRow -{ - protected Row wrapped; - - private final ReusableIterator cellIterator = new ReusableIterator(); - private final ReusableSearchIterator cellSearchIterator = new ReusableSearchIterator(); - - /** - * Apply some filtering/transformation on cells. This function - * can return {@code null} in which case the cell will be ignored. - */ - protected abstract Cell filterCell(Cell cell); - - protected DeletionTime filterDeletionTime(DeletionTime deletionTime) - { - return deletionTime; - } - - protected ColumnData filterColumnData(ColumnData data) - { - if (data.column().isComplex()) - { - Iterator<Cell> cells = cellIterator.setTo(data.cells()); - DeletionTime dt = filterDeletionTime(data.complexDeletion()); - return cells == null && dt.isLive() - ? null - : new ColumnData(data.column(), null, cells == null ? Collections.emptyIterator(): cells, dt); - } - else - { - Cell filtered = filterCell(data.cell()); - return filtered == null ? null : new ColumnData(data.column(), filtered, null, null); - } - } - - public WrappingRow setTo(Row row) - { - this.wrapped = row; - return this; - } - - public Unfiltered.Kind kind() - { - return Unfiltered.Kind.ROW; - } - - public Clustering clustering() - { - return wrapped.clustering(); - } - - public Columns columns() - { - return wrapped.columns(); - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return wrapped.primaryKeyLivenessInfo(); - } - - public DeletionTime deletion() - { - return wrapped.deletion(); - } - - public boolean hasComplexDeletion() - { - // Note that because cells can be filtered out/transformed through - // filterCell(), we can't rely on wrapped.hasComplexDeletion(). - for (int i = 0; i < columns().complexColumnCount(); i++) - if (!getDeletion(columns().getComplex(i)).isLive()) - return true; - return false; - } - - public Cell getCell(ColumnDefinition c) - { - Cell cell = wrapped.getCell(c); - return cell == null ? null : filterCell(cell); - } - - public Cell getCell(ColumnDefinition c, CellPath path) - { - Cell cell = wrapped.getCell(c, path); - return cell == null ? null : filterCell(cell); - } - - public Iterator<Cell> getCells(ColumnDefinition c) - { - Iterator<Cell> cells = wrapped.getCells(c); - if (cells == null) - return null; - - cellIterator.setTo(cells); - return cellIterator.hasNext() ? cellIterator : null; - } - - public DeletionTime getDeletion(ColumnDefinition c) - { - return filterDeletionTime(wrapped.getDeletion(c)); - } - - public Iterator<Cell> iterator() - { - return cellIterator.setTo(wrapped.iterator()); - } - - public SearchIterator<ColumnDefinition, ColumnData> searchIterator() - { - return cellSearchIterator.setTo(wrapped.searchIterator()); - } - - public Row takeAlias() - { - boolean isCounter = columns().hasCounters(); - if (isStatic()) - { - StaticRow.Builder builder = StaticRow.builder(columns(), true, isCounter); - copyTo(builder); - return builder.build(); - } - else - { - ReusableRow copy = new ReusableRow(clustering().size(), columns(), true, isCounter); - copyTo(copy.writer()); - return copy; - } - } - - private class ReusableIterator extends UnmodifiableIterator<Cell> - { - private Iterator<Cell> iter; - private Cell next; - - public ReusableIterator setTo(Iterator<Cell> iter) - { - this.iter = iter; - this.next = null; - return this; - } - - public boolean hasNext() - { - while (next == null && iter.hasNext()) - next = filterCell(iter.next()); - return next != null; - } - - public Cell next() - { - if (next == null && !hasNext()) - throw new NoSuchElementException(); - - Cell result = next; - next = null; - return result; - } - }; - - private class ReusableSearchIterator implements SearchIterator<ColumnDefinition, ColumnData> - { - private SearchIterator<ColumnDefinition, ColumnData> iter; - - public ReusableSearchIterator setTo(SearchIterator<ColumnDefinition, ColumnData> iter) - { - this.iter = iter; - return this; - } - - public boolean hasNext() - { - return iter.hasNext(); - } - - public ColumnData next(ColumnDefinition column) - { - ColumnData data = iter.next(column); - if (data == null) - return null; - - return filterColumnData(data); - } - }; -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java index 680e502..ff3f82c 100644 --- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java @@ -17,16 +17,21 @@ */ package org.apache.cassandra.db.rows; +import java.util.NoSuchElementException; + import com.google.common.collect.UnmodifiableIterator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; /** - * Abstract class to make writing atom iterators that wrap another iterator + * Abstract class to make writing unfiltered iterators that wrap another iterator * easier. By default, the wrapping iterator simply delegate every call to - * the wrapped iterator so concrete implementations will override some of the - * methods. + * the wrapped iterator so concrete implementations will have to override + * some of the methods. + * <p> + * Note that if most of what you want to do is modifying/filtering the returned + * {@code Unfiltered}, {@link AlteringUnfilteredRowIterator} can be a simpler option. */ public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator<Unfiltered> implements UnfilteredRowIterator { @@ -67,6 +72,11 @@ public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator return wrapped.staticRow(); } + public RowStats stats() + { + return wrapped.stats(); + } + public boolean hasNext() { return wrapped.hasNext(); @@ -77,11 +87,6 @@ public abstract class WrappingUnfilteredRowIterator extends UnmodifiableIterator return wrapped.next(); } - public RowStats stats() - { - return wrapped.stats(); - } - public void close() { wrapped.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java index 124547a..43e214b 100644 --- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java @@ -209,6 +209,7 @@ public class CQLSSTableWriter implements Closeable // Note that we asks indexes to not validate values (the last 'false' arg below) because that triggers a 'Keyspace.open' // and that forces a lot of initialization that we don't want. UpdateParameters params = new UpdateParameters(insert.cfm, + insert.updatedColumns(), options, insert.getTimestamp(now, options), insert.getTimeToLive(options), http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java index 70ab99c..f1af85c 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java @@ -52,7 +52,7 @@ public class SSTableIdentityIterator extends AbstractIterator<Unfiltered> implem try { this.partitionLevelDeletion = DeletionTime.serializer.deserialize(file); - SerializationHelper helper = new SerializationHelper(sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); + SerializationHelper helper = new SerializationHelper(sstable.metadata, sstable.descriptor.version.correspondingMessagingVersion(), SerializationHelper.Flag.LOCAL); this.iterator = SSTableSimpleIterator.create(sstable.metadata, file, sstable.header, helper, partitionLevelDeletion); this.staticRow = iterator.readStaticRow(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java index 9e2faee..56b621a 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.io.sstable; -import java.io.DataInput; import java.io.IOException; import java.io.IOError; import java.util.Iterator; @@ -42,10 +41,10 @@ import org.apache.cassandra.net.MessagingService; public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> implements Iterator<Unfiltered> { protected final CFMetaData metadata; - protected final DataInput in; + protected final DataInputPlus in; protected final SerializationHelper helper; - private SSTableSimpleIterator(CFMetaData metadata, DataInput in, SerializationHelper helper) + private SSTableSimpleIterator(CFMetaData metadata, DataInputPlus in, SerializationHelper helper) { this.metadata = metadata; this.in = in; @@ -66,37 +65,26 @@ public abstract class SSTableSimpleIterator extends AbstractIterator<Unfiltered> { private final SerializationHeader header; - private final ReusableRow row; - private final RangeTombstoneMarker.Builder markerBuilder; + private final Row.Builder builder; - private CurrentFormatIterator(CFMetaData metadata, DataInput in, SerializationHeader header, SerializationHelper helper) + private CurrentFormatIterator(CFMetaData metadata, DataInputPlus in, SerializationHeader header, SerializationHelper helper) { super(metadata, in, helper); this.header = header; - - int clusteringSize = metadata.comparator.size(); - Columns regularColumns = header == null ? metadata.partitionColumns().regulars : header.columns().regulars; - - this.row = new ReusableRow(clusteringSize, regularColumns, true, metadata.isCounter()); - this.markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize); + this.builder = ArrayBackedRow.sortedBuilder(helper.fetchedRegularColumns(header)); } public Row readStaticRow() throws IOException { - return header.hasStatic() - ? UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper) - : Rows.EMPTY_STATIC_ROW; + return header.hasStatic() ? UnfilteredSerializer.serializer.deserializeStaticRow(in, header, helper) : Rows.EMPTY_STATIC_ROW; } protected Unfiltered computeNext() { try { - Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, header, helper, row.writer(), markerBuilder.reset()); - - return kind == null - ? endOfData() - : (kind == Unfiltered.Kind.ROW ? row : markerBuilder.build()); + Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, header, helper, builder); + return unfiltered == null ? endOfData() : unfiltered; } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java index 5dbe52a..ef3bde1 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java @@ -32,7 +32,9 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowStats; +import org.apache.cassandra.db.rows.UnfilteredSerializer; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@ -58,6 +60,9 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter private final long bufferSize; private long currentSize; + // Used to compute the row serialized size + private final SerializationHeader header; + private final BlockingQueue<Buffer> writeQueue = new SynchronousQueue<Buffer>(); private final DiskWriter diskWriter = new DiskWriter(); @@ -65,6 +70,7 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter { super(directory, metadata, partitioner, columns); this.bufferSize = bufferSizeInMB * 1024L * 1024L; + this.header = new SerializationHeader(metadata, columns, RowStats.NO_STATS); diskWriter.start(); } @@ -76,42 +82,21 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter if (previous == null) { previous = createPartitionUpdate(key); - count(PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion())); + currentSize += PartitionUpdate.serializer.serializedSize(previous, formatType.info.getLatestVersion().correspondingMessagingVersion()); previous.allowNewUpdates(); buffer.put(key, previous); } return previous; } - private void count(long size) - { - currentSize += size; - } - - private void countCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info, CellPath path) + private void countRow(Row row) { - // Note that the accounting of a cell is a bit inaccurate (it doesn't take some of the file format optimization into account) + // Note that the accounting of a row is a bit inaccurate (it doesn't take some of the file format optimization into account) // and the maintaining of the bufferSize is in general not perfect. This has always been the case for this class but we should // improve that. In particular, what we count is closer to the serialized value, but it's debatable that it's the right thing // to count since it will take a lot more space in memory and the bufferSize if first and foremost used to avoid OOM when // using this writer. - - count(1); // Each cell has a byte flag on disk - - if (value.hasRemaining()) - count(column.type.writtenLength(value)); - - count(8); // timestamp - if (info.hasLocalDeletionTime()) - count(4); - if (info.hasTTL()) - count(4); - - if (path != null) - { - assert path.size() == 1; - count(2 + path.get(0).remaining()); - } + currentSize += UnfilteredSerializer.serializer.serializedSize(row, header, formatType.info.getLatestVersion().correspondingMessagingVersion()); } private void maybeSync() throws SyncException @@ -134,52 +119,11 @@ class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter return new PartitionUpdate(metadata, key, columns, 4) { @Override - protected StaticWriter createStaticWriter() + public void add(Row row) { - return new StaticWriter() - { - @Override - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - super.writeCell(column, isCounter, value, info, path); - countCell(column, value, info, path); - } - - @Override - public void endOfRow() - { - super.endOfRow(); - maybeSync(); - } - }; - } - - @Override - protected Writer createWriter() - { - return new RegularWriter() - { - @Override - public void writeClusteringValue(ByteBuffer value) - { - super.writeClusteringValue(value); - count(2 + value.remaining()); - } - - @Override - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - super.writeCell(column, isCounter, value, info, path); - countCell(column, value, info, path); - } - - @Override - public void endOfRow() - { - super.endOfRow(); - maybeSync(); - } - }; + super.add(row); + countRow(row); + maybeSync(); } }; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index a991d99..6759293 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -18,7 +18,6 @@ package org.apache.cassandra.io.sstable.format.big; import java.io.*; -import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -30,7 +29,6 @@ import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.IPartitioner; @@ -54,11 +52,8 @@ public class BigTableWriter extends SSTableWriter { private static final Logger logger = LoggerFactory.getLogger(BigTableWriter.class); - // not very random, but the only value that can't be mistaken for a legal column-name length - public static final int END_OF_ROW = 0x0000; - private final IndexWriter iwriter; - private SegmentedFile.Builder dbuilder; + private final SegmentedFile.Builder dbuilder; private final SequentialWriter dataFile; private DecoratedKey lastWrittenKey; private FileMark dataMark; @@ -101,8 +96,8 @@ public class BigTableWriter extends SSTableWriter private long beforeAppend(DecoratedKey decoratedKey) { assert decoratedKey != null : "Keys must not be null"; // empty keys ARE allowed b/c of indexed column values - if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) - throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); + //if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + // throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } @@ -172,11 +167,10 @@ public class BigTableWriter extends SSTableWriter } } - private static class StatsCollector extends WrappingUnfilteredRowIterator + private static class StatsCollector extends AlteringUnfilteredRowIterator { - private int cellCount; private final MetadataCollector collector; - private final Set<ColumnDefinition> complexColumnsSetInRow = new HashSet<>(); + private int cellCount; StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector) { @@ -186,55 +180,36 @@ public class BigTableWriter extends SSTableWriter } @Override - public Unfiltered next() + protected Row computeNextStatic(Row row) + { + if (!row.isEmpty()) + cellCount += Rows.collectStats(row, collector); + return row; + } + + @Override + protected Row computeNext(Row row) { - Unfiltered unfiltered = super.next(); - collector.updateClusteringValues(unfiltered.clustering()); + collector.updateClusteringValues(row.clustering()); + cellCount += Rows.collectStats(row, collector); + return row; + } - switch (unfiltered.kind()) + @Override + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + collector.updateClusteringValues(marker.clustering()); + if (marker.isBoundary()) + { + RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; + collector.update(bm.endDeletionTime()); + collector.update(bm.startDeletionTime()); + } + else { - case ROW: - Row row = (Row) unfiltered; - collector.update(row.primaryKeyLivenessInfo()); - collector.update(row.deletion()); - - int simpleColumnsSet = 0; - complexColumnsSetInRow.clear(); - - for (Cell cell : row) - { - if (cell.column().isComplex()) - complexColumnsSetInRow.add(cell.column()); - else - ++simpleColumnsSet; - - ++cellCount; - collector.update(cell.livenessInfo()); - - if (cell.isCounterCell()) - collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell)); - } - - for (int i = 0; i < row.columns().complexColumnCount(); i++) - collector.update(row.getDeletion(row.columns().getComplex(i))); - - collector.updateColumnSetPerRow(simpleColumnsSet + complexColumnsSetInRow.size()); - - break; - case RANGE_TOMBSTONE_MARKER: - if (((RangeTombstoneMarker) unfiltered).isBoundary()) - { - RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)unfiltered; - collector.update(bm.endDeletionTime()); - collector.update(bm.startDeletionTime()); - } - else - { - collector.update(((RangeTombstoneBoundMarker)unfiltered).deletionTime()); - } - break; + collector.update(((RangeTombstoneBoundMarker)marker).deletionTime()); } - return unfiltered; + return marker; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 2574c62..9b06b53 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -33,6 +33,8 @@ import com.clearspring.analytics.stream.cardinality.ICardinality; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; +import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -41,7 +43,7 @@ import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.MurmurHash; import org.apache.cassandra.utils.StreamingHistogram; -public class MetadataCollector +public class MetadataCollector implements PartitionStatisticsCollector { public static final double NO_COMPRESSION_RATIO = -1.0; @@ -89,8 +91,8 @@ public class MetadataCollector protected EstimatedHistogram estimatedCellPerPartitionCount = defaultCellPerPartitionCountHistogram(); protected ReplayPosition replayPosition = ReplayPosition.NONE; protected final MinMaxLongTracker timestampTracker = new MinMaxLongTracker(); - protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(LivenessInfo.NO_DELETION_TIME, LivenessInfo.NO_DELETION_TIME); - protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(LivenessInfo.NO_TTL, LivenessInfo.NO_TTL); + protected final MinMaxIntTracker localDeletionTimeTracker = new MinMaxIntTracker(Cell.NO_DELETION_TIME, Cell.NO_DELETION_TIME); + protected final MinMaxIntTracker ttlTracker = new MinMaxIntTracker(Cell.NO_TTL, Cell.NO_TTL); protected double compressionRatio = NO_COMPRESSION_RATIO; protected Set<Integer> ancestors = new HashSet<>(); protected StreamingHistogram estimatedTombstoneDropTime = defaultTombstoneDropTimeHistogram(); @@ -178,34 +180,39 @@ public class MetadataCollector return this; } - public MetadataCollector update(LivenessInfo newInfo) + public void update(LivenessInfo newInfo) { - // If the info doesn't have a timestamp, this means the info is basically irrelevant (it's a row - // update whose only info we care are the cells info basically). - if (newInfo.hasTimestamp()) + if (newInfo.isEmpty()) + return; + + updateTimestamp(newInfo.timestamp()); + if (newInfo.isExpiring()) { - updateTimestamp(newInfo.timestamp()); updateTTL(newInfo.ttl()); - updateLocalDeletionTime(newInfo.localDeletionTime()); + updateLocalDeletionTime(newInfo.localExpirationTime()); } - return this; } - public MetadataCollector update(DeletionTime dt) + public void update(Cell cell) + { + updateTimestamp(cell.timestamp()); + updateTTL(cell.ttl()); + updateLocalDeletionTime(cell.localDeletionTime()); + } + + public void update(DeletionTime dt) { if (!dt.isLive()) { updateTimestamp(dt.markedForDeleteAt()); updateLocalDeletionTime(dt.localDeletionTime()); } - return this; } - public MetadataCollector updateColumnSetPerRow(long columnSetInRow) + public void updateColumnSetPerRow(long columnSetInRow) { totalColumnsSet += columnSetInRow; ++totalRows; - return this; } private void updateTimestamp(long newTimestamp) @@ -279,10 +286,9 @@ public class MetadataCollector return b2; } - public MetadataCollector updateHasLegacyCounterShards(boolean hasLegacyCounterShards) + public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards) { this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; - return this; } public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, SerializationHeader header) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 879a505..50644bb 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -371,18 +371,18 @@ public final class LegacySchemaMigrator if (isSuper) { - defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true), null)); + defs.add(ColumnDefinition.regularDef(ksName, cfName, CompactTables.SUPER_COLUMN_MAP_COLUMN_STR, MapType.getInstance(subComparator, defaultValidator, true))); } else if (isStaticCompactTable) { defs.add(ColumnDefinition.clusteringKeyDef(ksName, cfName, names.defaultClusteringName(), rawComparator, null)); - defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator, null)); + defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), defaultValidator)); } else { // For dense compact tables, we get here if we don't have a compact value column, in which case we should add it // (we use EmptyType to recognize that the compact value was not declared by the use (see CreateTableStatement too)) - defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance, null)); + defs.add(ColumnDefinition.regularDef(ksName, cfName, names.defaultCompactValueName(), EmptyType.instance)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 4228a46..2eb0ac0 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -353,7 +353,7 @@ public final class SchemaKeyspace mutationMap.put(key, mutation); } - mutation.add(UnfilteredRowIterators.toUpdate(partition)); + mutation.add(PartitionUpdate.fromIterator(partition)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index f164a60..a9024e3 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -39,7 +39,7 @@ import org.apache.cassandra.utils.FBUtilities; public class DataResolver extends ResponseResolver { - private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<AsyncOneResponse>()); + private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) { @@ -162,9 +162,8 @@ public class DataResolver extends ResponseResolver private final boolean isReversed; private final PartitionUpdate[] repairs = new PartitionUpdate[sources.length]; - private final Row.Writer[] currentRows = new Row.Writer[sources.length]; - private Clustering currentClustering; - private ColumnDefinition currentColumn; + private final Row.Builder[] currentRows = new Row.Builder[sources.length]; + private final RowDiffListener diffListener; private final Slice.Bound[] markerOpen = new Slice.Bound[sources.length]; private final DeletionTime[] markerTime = new DeletionTime[sources.length]; @@ -174,87 +173,75 @@ public class DataResolver extends ResponseResolver this.partitionKey = partitionKey; this.columns = columns; this.isReversed = isReversed; - } - private PartitionUpdate update(int i) - { - PartitionUpdate upd = repairs[i]; - if (upd == null) + this.diffListener = new RowDiffListener() { - upd = new PartitionUpdate(command.metadata(), partitionKey, columns, 1); - repairs[i] = upd; - } - return upd; - } + public void onPrimaryKeyLivenessInfo(int i, Clustering clustering, LivenessInfo merged, LivenessInfo original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addPrimaryKeyLivenessInfo(merged); + } - private Row.Writer currentRow(int i) - { - Row.Writer row = currentRows[i]; - if (row == null) - { - row = currentClustering == Clustering.STATIC_CLUSTERING ? update(i).staticWriter() : update(i).writer(); - currentClustering.writeTo(row); - currentRows[i] = row; - } - return row; - } + public void onDeletion(int i, Clustering clustering, DeletionTime merged, DeletionTime original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addRowDeletion(merged); + } - public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) - { - for (int i = 0; i < versions.length; i++) - { - DeletionTime version = versions[i]; - if (mergedDeletion.supersedes(versions[i])) - update(i).addPartitionDeletion(mergedDeletion); - } - } + public void onComplexDeletion(int i, Clustering clustering, ColumnDefinition column, DeletionTime merged, DeletionTime original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addComplexDeletion(column, merged); + } - public void onMergingRows(Clustering clustering, - LivenessInfo mergedInfo, - DeletionTime mergedDeletion, - Row[] versions) - { - currentClustering = clustering; - for (int i = 0; i < versions.length; i++) - { - Row version = versions[i]; + public void onCell(int i, Clustering clustering, Cell merged, Cell original) + { + if (merged != null && !merged.equals(original)) + currentRow(i, clustering).addCell(merged); + } - if (version == null || mergedInfo.supersedes(version.primaryKeyLivenessInfo())) - currentRow(i).writePartitionKeyLivenessInfo(mergedInfo); + }; + } - if (version == null || mergedDeletion.supersedes(version.deletion())) - currentRow(i).writeRowDeletion(mergedDeletion); - } + private PartitionUpdate update(int i) + { + if (repairs[i] == null) + repairs[i] = new PartitionUpdate(command.metadata(), partitionKey, columns, 1); + return repairs[i]; } - public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedCompositeDeletion, DeletionTime[] versions) + private Row.Builder currentRow(int i, Clustering clustering) { - currentColumn = c; - for (int i = 0; i < versions.length; i++) + if (currentRows[i] == null) { - DeletionTime version = versions[i] == null ? DeletionTime.LIVE : versions[i]; - if (mergedCompositeDeletion.supersedes(version)) - currentRow(i).writeComplexDeletion(c, mergedCompositeDeletion); + currentRows[i] = ArrayBackedRow.sortedBuilder(clustering == Clustering.STATIC_CLUSTERING ? columns.statics : columns.regulars); + currentRows[i].newRow(clustering); } + return currentRows[i]; } - public void onMergedCells(Cell mergedCell, Cell[] versions) + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) { for (int i = 0; i < versions.length; i++) { - Cell version = versions[i]; - Cell toAdd = version == null ? mergedCell : Cells.diff(mergedCell, version); - if (toAdd != null) - toAdd.writeTo(currentRow(i)); + if (mergedDeletion.supersedes(versions[i])) + update(i).addPartitionDeletion(mergedDeletion); } } - public void onRowDone() + public void onMergedRows(Row merged, Columns columns, Row[] versions) { + // If a row was shadowed post merged, it must be by a partition level or range tombstone, and we handle + // those case directly in their respective methods (in other words, it would be inefficient to send a row + // deletion as repair when we know we've already send a partition level or range tombstone that covers it). + if (merged.isEmpty()) + return; + + Rows.diff(merged, columns, versions, diffListener); for (int i = 0; i < currentRows.length; i++) { if (currentRows[i] != null) - currentRows[i].endOfRow(); + update(i).add(currentRows[i].build()); } Arrays.fill(currentRows, null); } @@ -268,12 +255,12 @@ public class DataResolver extends ResponseResolver if (merged.isClose(isReversed) && markerOpen[i] != null) { Slice.Bound open = markerOpen[i]; - Slice.Bound close = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingCloseBound(isReversed).clustering() : merged.clustering(); - update(i).addRangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i]); + Slice.Bound close = merged.closeBound(isReversed); + update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), markerTime[i])); } if (merged.isOpen(isReversed) && (marker == null || merged.openDeletionTime(isReversed).supersedes(marker.openDeletionTime(isReversed)))) { - markerOpen[i] = merged.isBoundary() ? ((RangeTombstoneBoundaryMarker)merged).createCorrespondingOpenBound(isReversed).clustering() : merged.clustering(); + markerOpen[i] = merged.openBound(isReversed); markerTime[i] = merged.openDeletionTime(isReversed); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java index 6429be0..6c08be0 100644 --- a/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/RangeSliceQueryPager.java @@ -96,7 +96,7 @@ public class RangeSliceQueryPager extends AbstractQueryPager if (last != null) { lastReturnedKey = key; - lastReturnedClustering = last.clustering().takeAlias(); + lastReturnedClustering = last.clustering(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index 6488641..223c3fd 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -80,6 +80,6 @@ public class SinglePartitionPager extends AbstractQueryPager protected void recordLast(DecoratedKey key, Row last) { if (last != null) - lastReturned = last.clustering().takeAlias(); + lastReturned = last.clustering(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/service/paxos/Commit.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java b/src/java/org/apache/cassandra/service/paxos/Commit.java index 9a5e619..579c315 100644 --- a/src/java/org/apache/cassandra/service/paxos/Commit.java +++ b/src/java/org/apache/cassandra/service/paxos/Commit.java @@ -27,7 +27,6 @@ import java.util.UUID; import com.google.common.base.Objects; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.partitions.PartitionUpdate; @@ -63,7 +62,8 @@ public class Commit public static Commit newProposal(UUID ballot, PartitionUpdate update) { - return new Commit(ballot, updatesWithPaxosTime(update, ballot)); + update.updateAllTimestamp(UUIDGen.microsTimestamp(ballot)); + return new Commit(ballot, update); } public static Commit emptyCommit(DecoratedKey key, CFMetaData metadata) @@ -83,7 +83,6 @@ public class Commit public Mutation makeMutation() { - assert update != null; return new Mutation(update); } @@ -95,10 +94,7 @@ public class Commit Commit commit = (Commit) o; - if (!ballot.equals(commit.ballot)) return false; - if (!update.equals(commit.update)) return false; - - return true; + return ballot.equals(commit.ballot) && update.equals(commit.update); } @Override @@ -107,46 +103,6 @@ public class Commit return Objects.hashCode(ballot, update); } - private static PartitionUpdate updatesWithPaxosTime(PartitionUpdate update, UUID ballot) - { - long t = UUIDGen.microsTimestamp(ballot); - // Using t-1 for tombstones so deletion doesn't trump newly inserted data (#6069) - PartitionUpdate newUpdate = new PartitionUpdate(update.metadata(), - update.partitionKey(), - update.deletionInfo().updateAllTimestamp(t-1), - update.columns(), - update.rowCount()); - - if (!update.staticRow().isEmpty()) - copyWithUpdatedTimestamp(update.staticRow(), newUpdate.staticWriter(), t); - - for (Row row : update) - copyWithUpdatedTimestamp(row, newUpdate.writer(), t); - - return newUpdate; - } - - private static void copyWithUpdatedTimestamp(Row row, Row.Writer writer, long timestamp) - { - Rows.writeClustering(row.clustering(), writer); - writer.writePartitionKeyLivenessInfo(row.primaryKeyLivenessInfo().withUpdatedTimestamp(timestamp)); - writer.writeRowDeletion(row.deletion()); - - for (Cell cell : row) - writer.writeCell(cell.column(), cell.isCounterCell(), cell.value(), cell.livenessInfo().withUpdatedTimestamp(timestamp), cell.path()); - - for (int i = 0; i < row.columns().complexColumnCount(); i++) - { - ColumnDefinition c = row.columns().getComplex(i); - DeletionTime dt = row.getDeletion(c); - // We use t-1 to make sure that on inserting a collection literal, the deletion that comes with it does not - // end up deleting the inserted data (see #6069) - if (!dt.isLive()) - writer.writeComplexDeletion(c, new SimpleDeletionTime(timestamp-1, dt.localDeletionTime())); - } - writer.endOfRow(); - } - @Override public String toString() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReader.java b/src/java/org/apache/cassandra/streaming/StreamReader.java index 2876f08..ee646aa 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@ -18,7 +18,6 @@ package org.apache.cassandra.streaming; import java.io.*; -import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; import java.util.Collection; @@ -36,7 +35,6 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableSimpleIterator; import org.apache.cassandra.io.sstable.format.SSTableFormat; @@ -184,16 +182,13 @@ public class StreamReader private Row staticRow; private IOException exception; - private final CounterFilteredRow counterRow; - public StreamDeserializer(CFMetaData metadata, DataInputPlus in, Version version, SerializationHeader header) { assert version.storeRows() : "We don't allow streaming from pre-3.0 nodes"; this.metadata = metadata; this.in = in; - this.helper = new SerializationHelper(version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); + this.helper = new SerializationHelper(metadata, version.correspondingMessagingVersion(), SerializationHelper.Flag.PRESERVE_SIZE); this.header = header; - this.counterRow = metadata.isCounter() ? new CounterFilteredRow() : null; } public DecoratedKey newPartition() throws IOException @@ -271,9 +266,7 @@ public class StreamReader private Row maybeMarkLocalToBeCleared(Row row) { - return metadata.isCounter() - ? counterRow.setTo(row) - : row; + return metadata.isCounter() ? row.markCounterLocalToBeCleared() : row; } public void checkForExceptions() throws IOException @@ -286,18 +279,4 @@ public class StreamReader { } } - - private static class CounterFilteredRow extends WrappingRow - { - protected Cell filterCell(Cell cell) - { - if (!cell.isCounterCell()) - return cell; - - ByteBuffer marked = CounterContext.instance().markLocalToBeCleared(cell.value()); - return marked == cell.value() - ? cell - : Cells.create(cell.column(), true, marked, cell.livenessInfo(), cell.path()); - } - } }
