Repository: cassandra Updated Branches: refs/heads/trunk 8302ef7a8 -> 4e57b8200
Avoid digest mismatches on upgrade to 3.0 patch by slebresne; reviewed by blambov for CASSANDRA-9554 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/782a1c3a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/782a1c3a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/782a1c3a Branch: refs/heads/trunk Commit: 782a1c3aecffb8674338fbd9ceeb56a357ecc3f2 Parents: 627e939 Author: Sylvain Lebresne <[email protected]> Authored: Thu Jul 23 23:42:54 2015 +0200 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Sep 1 10:52:57 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/UpdateParameters.java | 9 +- .../org/apache/cassandra/db/DeletionTime.java | 4 +- .../org/apache/cassandra/db/LegacyLayout.java | 72 +++++++- .../cassandra/db/PartitionRangeReadCommand.java | 13 +- .../org/apache/cassandra/db/ReadCommand.java | 76 ++++++-- .../org/apache/cassandra/db/ReadResponse.java | 28 +-- .../db/SinglePartitionNamesCommand.java | 9 +- .../db/SinglePartitionReadCommand.java | 13 +- .../db/SinglePartitionSliceCommand.java | 7 +- .../UnfilteredPartitionIterators.java | 11 +- .../apache/cassandra/db/rows/AbstractCell.java | 1 - .../db/rows/UnfilteredRowIterators.java | 20 +- .../org/apache/cassandra/repair/Validator.java | 2 +- .../cassandra/service/AbstractReadExecutor.java | 9 + .../cassandra/service/DigestResolver.java | 2 +- .../cassandra/thrift/CassandraServer.java | 5 +- .../cassandra/cache/CacheProviderTest.java | 5 +- .../org/apache/cassandra/db/PartitionTest.java | 82 +++++---- .../rows/DigestBackwardCompatibilityTest.java | 182 +++++++++++++++++++ 20 files changed, 452 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 08b7df0..390255c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.0-beta2 + * Avoid digest mismatch on upgrade to 3.0 (CASSANDRA-9554) * Fix Materialized View builder when adding multiple MVs (CASSANDRA-10156) * Choose better poolingOptions for protocol v4 in cassandra-stress (CASSANDRA-10182) * Fix LWW bug affecting Materialized Views (CASSANDRA-10197) http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index 1cdb64d..045b1e1 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -146,7 +146,14 @@ public class UpdateParameters public void addRowDeletion() { - builder.addRowDeletion(deletionTime); + // For compact tables, at the exclusion of the static row (of static compact tables), each row ever has a single column, + // the "compact" one. As such, deleting the row or deleting that single cell is equivalent. We favor the later however + // because that makes it easier when translating back to the old format layout (for thrift and pre-3.0 backward + // compatibility) as we don't have to special case for the row deletion. This is also in line with what we use to do pre-3.0. + if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING) + addTombstone(metadata.compactValueColumn()); + else + builder.addRowDeletion(deletionTime); } public void addTombstone(ColumnDefinition column) throws InvalidRequestException http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/DeletionTime.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DeletionTime.java b/src/java/org/apache/cassandra/db/DeletionTime.java index 3e9ca80..343a6c2 100644 --- a/src/java/org/apache/cassandra/db/DeletionTime.java +++ b/src/java/org/apache/cassandra/db/DeletionTime.java @@ -83,8 +83,10 @@ public class DeletionTime implements Comparable<DeletionTime>, IMeasurableMemory public void digest(MessageDigest digest) { + // localDeletionTime is basically a metadata of the deletion time that tells us when it's ok to purge it. + // It's thus intrinsically a local information and shouldn't be part of the digest (which exists for + // cross-nodes comparisons). FBUtilities.updateWithLong(digest, markedForDeleteAt()); - FBUtilities.updateWithInt(digest, localDeletionTime()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 628ac75..7b03e46 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.IOException; import java.io.IOError; import java.nio.ByteBuffer; +import java.security.MessageDigest; import java.util.*; import org.apache.cassandra.utils.AbstractIterator; @@ -919,7 +920,6 @@ public abstract class LegacyLayout }; } - public static LegacyAtom readLegacyAtom(CFMetaData metadata, DataInputPlus in, boolean readAllAsDynamic) throws IOException { while (true) @@ -1187,8 +1187,40 @@ public abstract class LegacyLayout this.rangeTombstones = rangeTombstones; this.cells = cells; } - } + public void digest(CFMetaData metadata, MessageDigest digest) + { + for (LegacyCell cell : cells) + { + digest.update(cell.name.encode(metadata).duplicate()); + + if (cell.isCounter()) + CounterContext.instance().updateDigest(digest, cell.value); + else + digest.update(cell.value.duplicate()); + + FBUtilities.updateWithLong(digest, cell.timestamp); + FBUtilities.updateWithByte(digest, cell.serializationFlags()); + + if (cell.isExpiring()) + FBUtilities.updateWithInt(digest, cell.ttl); + + if (cell.isCounter()) + { + // Counters used to have the timestampOfLastDelete field, which we stopped using long ago and has been hard-coded + // to Long.MIN_VALUE but was still taken into account in 2.2 counter digests (to maintain backward compatibility + // in the first place). + FBUtilities.updateWithLong(digest, Long.MIN_VALUE); + } + } + + if (partitionDeletion.markedForDeleteAt() != Long.MIN_VALUE) + digest.update(ByteBufferUtil.bytes(partitionDeletion.markedForDeleteAt())); + + if (!rangeTombstones.isEmpty()) + rangeTombstones.updateDigest(digest); + } + } public static class LegacyCellName { @@ -1285,6 +1317,12 @@ public abstract class LegacyLayout */ public static class LegacyCell implements LegacyAtom { + private final static int DELETION_MASK = 0x01; + private final static int EXPIRATION_MASK = 0x02; + private final static int COUNTER_MASK = 0x04; + private final static int COUNTER_UPDATE_MASK = 0x08; + private final static int RANGE_TOMBSTONE_MASK = 0x10; + public enum Kind { REGULAR, EXPIRING, DELETED, COUNTER } public final Kind kind; @@ -1337,6 +1375,17 @@ public abstract class LegacyLayout return new LegacyCell(Kind.COUNTER, name, value, FBUtilities.timestampMicros(), Cell.NO_DELETION_TIME, Cell.NO_TTL); } + public byte serializationFlags() + { + if (isExpiring()) + return EXPIRATION_MASK; + if (isTombstone()) + return DELETION_MASK; + if (isCounter()) + return COUNTER_MASK; + return 0; + } + public ClusteringPrefix clustering() { return name.clustering; @@ -1973,6 +2022,25 @@ public abstract class LegacyLayout delTimes[i] = delTime; } + public void updateDigest(MessageDigest digest) + { + ByteBuffer longBuffer = ByteBuffer.allocate(8); + for (int i = 0; i < size; i++) + { + for (int j = 0; j < starts[i].bound.size(); j++) + digest.update(starts[i].bound.get(j).duplicate()); + if (starts[i].collectionName != null) + digest.update(starts[i].collectionName.name.bytes.duplicate()); + for (int j = 0; j < ends[i].bound.size(); j++) + digest.update(ends[i].bound.get(j).duplicate()); + if (ends[i].collectionName != null) + digest.update(ends[i].collectionName.name.bytes.duplicate()); + + longBuffer.putLong(0, markedAts[i]); + digest.update(longBuffer.array(), 0, 8); + } + } + public void serialize(DataOutputPlus out, CFMetaData metadata) throws IOException { out.writeInt(size); http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 2ba45e7..da62557 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -57,6 +57,7 @@ public class PartitionRangeReadCommand extends ReadCommand private int oldestUnrepairedTombstone = Integer.MAX_VALUE; public PartitionRangeReadCommand(boolean isDigest, + int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, @@ -65,7 +66,7 @@ public class PartitionRangeReadCommand extends ReadCommand DataLimits limits, DataRange dataRange) { - super(Kind.PARTITION_RANGE, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + super(Kind.PARTITION_RANGE, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); this.dataRange = dataRange; } @@ -76,7 +77,7 @@ public class PartitionRangeReadCommand extends ReadCommand DataLimits limits, DataRange dataRange) { - this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange); + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, dataRange); } /** @@ -114,12 +115,12 @@ public class PartitionRangeReadCommand extends ReadCommand public PartitionRangeReadCommand forSubRange(AbstractBounds<PartitionPosition> range) { - return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range)); + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange().forSubRange(range)); } public PartitionRangeReadCommand copy() { - return new PartitionRangeReadCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange()); + return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), dataRange()); } public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) @@ -302,11 +303,11 @@ public class PartitionRangeReadCommand extends ReadCommand private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { DataRange range = DataRange.serializer.deserialize(in, version, metadata); - return new PartitionRangeReadCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range); + return new PartitionRangeReadCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, range); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 1d5d477..0bc8cea 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -71,11 +71,13 @@ public abstract class ReadCommand implements ReadQuery private final DataLimits limits; private boolean isDigestQuery; + // if a digest query, the version for which the digest is expected. Ignored if not a digest. + private int digestVersion; private final boolean isForThrift; protected static abstract class SelectionDeserializer { - public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; + public abstract ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException; } protected enum Kind @@ -93,6 +95,7 @@ public abstract class ReadCommand implements ReadQuery protected ReadCommand(Kind kind, boolean isDigestQuery, + int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, @@ -102,6 +105,7 @@ public abstract class ReadCommand implements ReadQuery { this.kind = kind; this.isDigestQuery = isDigestQuery; + this.digestVersion = digestVersion; this.isForThrift = isForThrift; this.metadata = metadata; this.nowInSec = nowInSec; @@ -192,6 +196,17 @@ public abstract class ReadCommand implements ReadQuery } /** + * If the query is a digest one, the requested digest version. + * + * @return the requested digest version if the query is a digest. Otherwise, this can return + * anything. + */ + public int digestVersion() + { + return digestVersion; + } + + /** * Sets whether this command should be a digest one or not. * * @param isDigestQuery whether the command should be set as a digest one or not. @@ -204,6 +219,22 @@ public abstract class ReadCommand implements ReadQuery } /** + * Sets the digest version, for when digest for that command is requested. + * <p> + * Note that we allow setting this independently of setting the command as a digest query as + * this allows us to use the command as a carrier of the digest version even if we only call + * setIsDigestQuery on some copy of it. + * + * @param digestVersion the version for the digest is this command is used for digest query.. + * @return this read command. + */ + public ReadCommand setDigestVersion(int digestVersion) + { + this.digestVersion = digestVersion; + return this; + } + + /** * Whether this query is for thrift or not. * * @return whether this query is for thrift. @@ -252,7 +283,7 @@ public abstract class ReadCommand implements ReadQuery public ReadResponse createResponse(UnfilteredPartitionIterator iterator, ColumnFilter selection) { return isDigestQuery() - ? ReadResponse.createDigestResponse(iterator) + ? ReadResponse.createDigestResponse(iterator, digestVersion) : ReadResponse.createDataResponse(iterator, selection); } @@ -481,6 +512,8 @@ public abstract class ReadCommand implements ReadQuery out.writeByte(command.kind.ordinal()); out.writeByte(digestFlag(command.isDigestQuery()) | thriftFlag(command.isForThrift())); + if (command.isDigestQuery()) + out.writeVInt(command.digestVersion()); CFMetaData.serializer.serialize(command.metadata(), out, version); out.writeInt(command.nowInSec()); ColumnFilter.serializer.serialize(command.columnFilter(), out, version); @@ -499,13 +532,14 @@ public abstract class ReadCommand implements ReadQuery int flags = in.readByte(); boolean isDigest = isDigest(flags); boolean isForThrift = isForThrift(flags); + int digestVersion = isDigest ? (int)in.readVInt() : 0; CFMetaData metadata = CFMetaData.serializer.deserialize(in, version); int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); DataLimits limits = DataLimits.serializer.deserialize(in, version); - return kind.selectionDeserializer.deserialize(in, version, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + return kind.selectionDeserializer.deserialize(in, version, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); } public long serializedSize(ReadCommand command, int version) @@ -514,6 +548,7 @@ public abstract class ReadCommand implements ReadQuery assert version >= MessagingService.VERSION_30; return 2 // kind + flags + + (command.isDigestQuery() ? TypeSizes.sizeofVInt(command.digestVersion()) : 0) + CFMetaData.serializer.serializedSize(command.metadata(), version) + TypeSizes.sizeof(command.nowInSec()) + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) @@ -704,7 +739,7 @@ public abstract class ReadCommand implements ReadQuery else limits = DataLimits.cqlLimits(maxResults); - return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter)); + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter)); } static void serializeRowFilter(DataOutputPlus out, RowFilter rowFilter) throws IOException @@ -814,7 +849,7 @@ public abstract class ReadCommand implements ReadQuery ClusteringIndexSliceFilter sliceFilter = LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata); DataRange newRange = new DataRange(command.dataRange().keyRange(), sliceFilter); return new PartitionRangeReadCommand( - command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(), + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), command.columnFilter(), command.rowFilter(), command.limits(), newRange); } @@ -963,10 +998,18 @@ public abstract class ReadCommand implements ReadQuery limits = limits.forPaging(maxResults); - // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it - // missing without any problems, so we can safely always set "inclusive" to false in the data range - DataRange dataRange = new DataRange(keyRange, filter).forPaging(keyRange, metadata.comparator, startClustering, false); - return new PartitionRangeReadCommand(false, true, metadata, nowInSec, selection, rowFilter, limits, dataRange); + // The pagedRangeCommand is used in pre-3.0 for both the first page and the following ones. On the first page, the startBound will be + // the start of the overall slice and will not be a proper Clustering. So detect that case and just return a non-paging DataRange, which + // is what 3.0 does. + DataRange dataRange = new DataRange(keyRange, filter); + Slices slices = filter.requestedSlices(); + if (startBound != LegacyLayout.LegacyBound.BOTTOM && !startBound.bound.equals(slices.get(0).start())) + { + // pre-3.0 nodes normally expect pages to include the last cell from the previous page, but they handle it + // missing without any problems, so we can safely always set "inclusive" to false in the data range + dataRange = dataRange.forPaging(keyRange, metadata.comparator, startBound.getAsClustering(metadata), false); + } + return new PartitionRangeReadCommand(false, 0, true, metadata, nowInSec, selection, rowFilter, limits, dataRange); } public long serializedSize(ReadCommand command, int version) @@ -1065,9 +1108,9 @@ public abstract class ReadCommand implements ReadQuery switch (msgType) { case GET_BY_NAMES: - return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds); + return deserializeNamesCommand(in, isDigest, metadata, dk, nowInSeconds, version); case GET_SLICES: - return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds); + return deserializeSliceCommand(in, isDigest, metadata, dk, nowInSeconds, version); default: throw new AssertionError(); } @@ -1101,7 +1144,6 @@ public abstract class ReadCommand implements ReadQuery serializeNamesFilter(command, command.clusteringIndexFilter(), out); } - private static void serializeNamesFilter(ReadCommand command, ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException { PartitionColumns columns = command.columnFilter().fetchedColumns(); @@ -1157,13 +1199,13 @@ public abstract class ReadCommand implements ReadQuery return size + TypeSizes.sizeof(true); // countCql3Rows } - private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException + private SinglePartitionNamesCommand deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException { Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter = deserializeNamesSelectionAndFilter(in, metadata); // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift return new SinglePartitionNamesCommand( - isDigest, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE, + isDigest, version, true, metadata, nowInSeconds, selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE, key, selectionAndFilter.right); } @@ -1243,7 +1285,7 @@ public abstract class ReadCommand implements ReadQuery out.writeInt(compositesToGroup); } - private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds) throws IOException + private SinglePartitionSliceCommand deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException { ClusteringIndexSliceFilter filter = deserializeSlicePartitionFilter(in, metadata); int count = in.readInt(); @@ -1266,7 +1308,7 @@ public abstract class ReadCommand implements ReadQuery limits = DataLimits.cqlLimits(count); // messages from old nodes will expect the thrift format, so always use 'true' for isForThrift - return new SinglePartitionSliceCommand(isDigest, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter); + return new SinglePartitionSliceCommand(isDigest, version, true, metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter); } private long serializedSliceCommandSize(SinglePartitionSliceCommand command) @@ -1423,7 +1465,7 @@ public abstract class ReadCommand implements ReadQuery ClusteringIndexNamesFilter filter = ((SinglePartitionNamesCommand) command).clusteringIndexFilter(); ClusteringIndexSliceFilter sliceFilter = convertNamesFilterToSliceFilter(filter, metadata); return new SinglePartitionSliceCommand( - command.isDigestQuery(), command.isForThrift(), metadata, command.nowInSec(), + command.isDigestQuery(), command.digestVersion(), command.isForThrift(), metadata, command.nowInSec(), command.columnFilter(), command.rowFilter(), command.limits(), command.partitionKey(), sliceFilter); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/ReadResponse.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 21f6106..547e7f4 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -70,20 +70,20 @@ public abstract class ReadResponse return new RemoteDataResponse(LocalDataResponse.build(data, selection)); } - public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data) + public static ReadResponse createDigestResponse(UnfilteredPartitionIterator data, int version) { - return new DigestResponse(makeDigest(data)); + return new DigestResponse(makeDigest(data, version)); } public abstract UnfilteredPartitionIterator makeIterator(CFMetaData metadata, ReadCommand command); public abstract ByteBuffer digest(CFMetaData metadata, ReadCommand command); - public abstract boolean isDigestQuery(); + public abstract boolean isDigestResponse(); - protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator) + protected static ByteBuffer makeDigest(UnfilteredPartitionIterator iterator, int version) { MessageDigest digest = FBUtilities.threadLocalMD5Digest(); - UnfilteredPartitionIterators.digest(iterator, digest); + UnfilteredPartitionIterators.digest(iterator, digest, version); return ByteBuffer.wrap(digest.digest()); } @@ -105,10 +105,14 @@ public abstract class ReadResponse public ByteBuffer digest(CFMetaData metadata, ReadCommand command) { + // We assume that the digest is in the proper version, which bug excluded should be true since this is called with + // ReadCommand.digestVersion() as argument and that's also what we use to produce the digest in the first place. + // Validating it's the proper digest in this method would require sending back the digest version along with the + // digest which would waste bandwith for little gain. return digest; } - public boolean isDigestQuery() + public boolean isDigestResponse() { return true; } @@ -201,11 +205,11 @@ public abstract class ReadResponse { try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) { - return makeDigest(iterator); + return makeDigest(iterator, command.digestVersion()); } } - public boolean isDigestQuery() + public boolean isDigestResponse() { return false; } @@ -268,11 +272,11 @@ public abstract class ReadResponse { try (UnfilteredPartitionIterator iterator = makeIterator(metadata, command)) { - return makeDigest(iterator); + return makeDigest(iterator, command.digestVersion()); } } - public boolean isDigestQuery() + public boolean isDigestResponse() { return false; } @@ -284,7 +288,6 @@ public abstract class ReadResponse { boolean isDigest = response instanceof DigestResponse; ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : ByteBufferUtil.EMPTY_BYTE_BUFFER; - if (version < MessagingService.VERSION_30) { out.writeInt(digest.remaining()); @@ -310,9 +313,6 @@ public abstract class ReadResponse ByteBufferUtil.writeWithVIntLength(digest, out); if (!isDigest) { - // Note that we can only get there if version == 3.0, which is the current_version. When we'll change the - // version, we'll have to deserialize/re-serialize the data to be in the proper version. - assert version == MessagingService.VERSION_30; ByteBuffer data = ((DataResponse)response).data; ByteBufferUtil.writeWithVIntLength(data, out); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java index f40da5b..cee3fc4 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java @@ -46,6 +46,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus { private int oldestUnrepairedDeletionTime = Integer.MAX_VALUE; protected SinglePartitionNamesCommand(boolean isDigest, + int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, @@ -55,7 +56,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus DecoratedKey partitionKey, ClusteringIndexNamesFilter clusteringIndexFilter) { - super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } public SinglePartitionNamesCommand(CFMetaData metadata, @@ -66,7 +67,7 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus DecoratedKey partitionKey, ClusteringIndexNamesFilter clusteringIndexFilter) { - this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } public SinglePartitionNamesCommand(CFMetaData metadata, @@ -77,12 +78,12 @@ public class SinglePartitionNamesCommand extends SinglePartitionReadCommand<Clus ByteBuffer key, ClusteringIndexNamesFilter clusteringIndexFilter) { - this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter); + this(metadata, nowInSec, columnFilter, rowFilter, limits, metadata.decorateKey(key), clusteringIndexFilter); } public SinglePartitionNamesCommand copy() { - return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + return new SinglePartitionNamesCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index ca135f8..7b62f5a 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -50,6 +50,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter private final F clusteringIndexFilter; protected SinglePartitionReadCommand(boolean isDigest, + int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, @@ -59,7 +60,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter DecoratedKey partitionKey, F clusteringIndexFilter) { - super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); assert partitionKey.getPartitioner() == metadata.partitioner; this.partitionKey = partitionKey; this.clusteringIndexFilter = clusteringIndexFilter; @@ -113,10 +114,10 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter ClusteringIndexFilter clusteringIndexFilter) { if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter) - return new SinglePartitionSliceCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter); + return new SinglePartitionSliceCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter); assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter; - return new SinglePartitionNamesCommand(false, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter); + return new SinglePartitionNamesCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter); } /** @@ -506,15 +507,15 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter private static class Deserializer extends SelectionDeserializer { - public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) throws IOException { DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); if (filter instanceof ClusteringIndexNamesFilter) - return new SinglePartitionNamesCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); + return new SinglePartitionNamesCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); else - return new SinglePartitionSliceCommand(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter); + return new SinglePartitionSliceCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java index 2dbf7b1..27aab62 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java @@ -44,6 +44,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus private int oldestUnrepairedTombstone = Integer.MAX_VALUE; public SinglePartitionSliceCommand(boolean isDigest, + int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, @@ -53,7 +54,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus DecoratedKey partitionKey, ClusteringIndexSliceFilter clusteringIndexFilter) { - super(isDigest, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + super(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } public SinglePartitionSliceCommand(CFMetaData metadata, @@ -64,7 +65,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus DecoratedKey partitionKey, ClusteringIndexSliceFilter clusteringIndexFilter) { - this(false, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + this(false, 0, false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); } /** @@ -118,7 +119,7 @@ public class SinglePartitionSliceCommand extends SinglePartitionReadCommand<Clus public SinglePartitionSliceCommand copy() { - return new SinglePartitionSliceCommand(isDigestQuery(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); + return new SinglePartitionSliceCommand(isDigestQuery(), digestVersion(), isForThrift(), metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), clusteringIndexFilter()); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index f7ee5ee..900b17a 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -319,7 +319,14 @@ public abstract class UnfilteredPartitionIterators }; } - public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest) + /** + * Digests the the provided iterator. + * + * @param iterator the iterator to digest. + * @param digest the {@code MessageDigest} to use for the digest. + * @param version the messaging protocol to use when producing the digest. + */ + public static void digest(UnfilteredPartitionIterator iterator, MessageDigest digest, int version) { try (UnfilteredPartitionIterator iter = iterator) { @@ -327,7 +334,7 @@ public abstract class UnfilteredPartitionIterators { try (UnfilteredRowIterator partition = iter.next()) { - UnfilteredRowIterators.digest(partition, digest); + UnfilteredRowIterators.digest(partition, digest, version); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/rows/AbstractCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/AbstractCell.java b/src/java/org/apache/cassandra/db/rows/AbstractCell.java index f53322a..e804b7a 100644 --- a/src/java/org/apache/cassandra/db/rows/AbstractCell.java +++ b/src/java/org/apache/cassandra/db/rows/AbstractCell.java @@ -44,7 +44,6 @@ public abstract class AbstractCell extends Cell { digest.update(value().duplicate()); FBUtilities.updateWithLong(digest, timestamp()); - FBUtilities.updateWithInt(digest, localDeletionTime()); FBUtilities.updateWithInt(digest, ttl()); FBUtilities.updateWithBoolean(digest, isCounterCell()); if (path() != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 48e00f9..477eac9 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -29,6 +29,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; @@ -157,12 +158,21 @@ public abstract class UnfilteredRowIterators }; } - public static void digest(UnfilteredRowIterator iterator, MessageDigest digest) + /** + * Digests the partition represented by the provided iterator. + * + * @param iterator the iterator to digest. + * @param digest the {@code MessageDigest} to use for the digest. + * @param version the messaging protocol to use when producing the digest. + */ + public static void digest(UnfilteredRowIterator iterator, MessageDigest digest, int version) { - // TODO: we're not computing digest the same way that old nodes. This - // means we'll have digest mismatches during upgrade. We should pass the messaging version of - // the node this is for (which might mean computing the digest last, and won't work - // for schema (where we announce the version through gossip to everyone)) + if (version < MessagingService.VERSION_30) + { + LegacyLayout.fromUnfilteredRowIterator(iterator).digest(iterator.metadata(), digest); + return; + } + digest.update(iterator.partitionKey().getKey().duplicate()); iterator.partitionLevelDeletion().digest(digest); iterator.columns().digest(digest); http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/repair/Validator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java index 7d6c787..d206305 100644 --- a/src/java/org/apache/cassandra/repair/Validator.java +++ b/src/java/org/apache/cassandra/repair/Validator.java @@ -209,7 +209,7 @@ public class Validator implements Runnable validated++; // MerkleTree uses XOR internally, so we want lots of output bits here CountingDigest digest = new CountingDigest(FBUtilities.newMessageDigest("SHA-256")); - UnfilteredRowIterators.digest(partition, digest); + UnfilteredRowIterators.digest(partition, digest, MessagingService.current_version); // only return new hash for merkle tree in case digest was updated - see CASSANDRA-8979 return digest.count > 0 ? new MerkleTree.RowHash(partition.partitionKey().getToken(), digest.digest(), digest.count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 16a3e6e..41d7bc6 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -69,6 +69,15 @@ public abstract class AbstractReadExecutor this.targetReplicas = targetReplicas; this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas); this.traceState = Tracing.instance.get(); + + // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes + // knows how to produce older digest but the reverse is not true. + // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once + // we stop being compatible with pre-3.0 nodes. + int digestVersion = MessagingService.current_version; + for (InetAddress replica : targetReplicas) + digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica)); + command.setDigestVersion(digestVersion); } protected void makeDataRequests(Iterable<InetAddress> endpoints) http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/service/DigestResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DigestResolver.java b/src/java/org/apache/cassandra/service/DigestResolver.java index db8adf3..572df6f 100644 --- a/src/java/org/apache/cassandra/service/DigestResolver.java +++ b/src/java/org/apache/cassandra/service/DigestResolver.java @@ -38,7 +38,7 @@ public class DigestResolver extends ResponseResolver public void preprocess(MessageIn<ReadResponse> message) { super.preprocess(message); - if (dataResponse == null && !message.payload.isDigestQuery()) + if (dataResponse == null && !message.payload.isDigestResponse()) dataResponse = message.payload; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index fc1b638..038384e 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -1513,6 +1513,7 @@ public class CassandraServer implements Cassandra.Iface ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, predicate); DataLimits limits = getLimits(range.count, metadata.isSuper() && !column_parent.isSetSuper_column(), predicate); PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + 0, true, metadata, nowInSec, @@ -1605,6 +1606,7 @@ public class CassandraServer implements Cassandra.Iface ? new Clustering(start_column) : LegacyLayout.decodeCellName(metadata, start_column).clustering; PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + 0, true, metadata, nowInSec, @@ -1695,6 +1697,7 @@ public class CassandraServer implements Cassandra.Iface ClusteringIndexFilter filter = toInternalFilter(metadata, column_parent, column_predicate); DataLimits limits = getLimits(index_clause.count, metadata.isSuper() && !column_parent.isSetSuper_column(), column_predicate); PartitionRangeReadCommand cmd = new PartitionRangeReadCommand(false, + 0, true, metadata, nowInSec, @@ -2511,7 +2514,7 @@ public class CassandraServer implements Cassandra.Iface // We want to know if the partition exists, so just fetch a single cell. ClusteringIndexSliceFilter filter = new ClusteringIndexSliceFilter(Slices.ALL, false); DataLimits limits = DataLimits.thriftLimits(1, 1); - return new SinglePartitionSliceCommand(false, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter); + return new SinglePartitionSliceCommand(false, 0, true, metadata, nowInSec, ColumnFilter.all(metadata), RowFilter.NONE, limits, key, filter); } // Gather the clustering for the expected values and query those. http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 5030029..21a41c4 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.FBUtilities; @@ -96,8 +97,8 @@ public class CacheProviderTest { MessageDigest d1 = MessageDigest.getInstance("MD5"); MessageDigest d2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1); - UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2); + UnfilteredRowIterators.digest(((CachedBTreePartition) one).unfilteredIterator(), d1, MessagingService.current_version); + UnfilteredRowIterators.digest(((CachedBTreePartition) two).unfilteredIterator(), d2, MessagingService.current_version); assertTrue(MessageDigest.isEqual(d1.digest(), d2.digest())); } catch (NoSuchAlgorithmException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/db/PartitionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/PartitionTest.java b/test/unit/org/apache/cassandra/db/PartitionTest.java index f0a63a8..623ff0e 100644 --- a/test/unit/org/apache/cassandra/db/PartitionTest.java +++ b/test/unit/org/apache/cassandra/db/PartitionTest.java @@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse; public class PartitionTest { - static int version = MessagingService.current_version; private static final String KEYSPACE1 = "Keyspace1"; private static final String CF_STANDARD1 = "Standard1"; private static final String CF_TENCOL = "TenColumns"; @@ -117,39 +116,58 @@ public class PartitionTest @Test public void testDigest() throws NoSuchAlgorithmException { + testDigest(MessagingService.current_version); + } + + @Test + public void testLegacyDigest() throws NoSuchAlgorithmException + { + testDigest(MessagingService.VERSION_22); + } + + public void testDigest(int version) throws NoSuchAlgorithmException + { ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_TENCOL); - RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 5, "key1").clustering("c").add("val", "val1"); - for (int i = 0; i < 10; i++) - builder.add("val" + i, "val" + i); - builder.build().applyUnsafe(); - new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe(); - - ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build()); - ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - - MessageDigest digest1 = MessageDigest.getInstance("MD5"); - MessageDigest digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2); - assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); - - p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - digest1 = MessageDigest.getInstance("MD5"); - digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2); - assertTrue(Arrays.equals(digest1.digest(), digest2.digest())); - - p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - RowUpdateBuilder.deleteRow(cfs.metadata, 6, "key2", "c").applyUnsafe(); - p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); - digest1 = MessageDigest.getInstance("MD5"); - digest2 = MessageDigest.getInstance("MD5"); - UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1); - UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2); - assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); + try + { + RowUpdateBuilder builder = new RowUpdateBuilder(cfs.metadata, 5, "key1").clustering("c").add("val", "val1"); + for (int i = 0; i < 10; i++) + builder.add("val" + i, "val" + i); + builder.build().applyUnsafe(); + + new RowUpdateBuilder(cfs.metadata, 5, "key2").clustering("c").add("val", "val2").build().applyUnsafe(); + + ImmutableBTreePartition p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key1").build()); + ImmutableBTreePartition p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + + MessageDigest digest1 = MessageDigest.getInstance("MD5"); + MessageDigest digest2 = MessageDigest.getInstance("MD5"); + UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); + + p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + digest1 = MessageDigest.getInstance("MD5"); + digest2 = MessageDigest.getInstance("MD5"); + UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + assertTrue(Arrays.equals(digest1.digest(), digest2.digest())); + + p1 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + RowUpdateBuilder.deleteRow(cfs.metadata, 6, "key2", "c").applyUnsafe(); + p2 = Util.getOnlyPartitionUnfiltered(Util.cmd(cfs, "key2").build()); + digest1 = MessageDigest.getInstance("MD5"); + digest2 = MessageDigest.getInstance("MD5"); + UnfilteredRowIterators.digest(p1.unfilteredIterator(), digest1, version); + UnfilteredRowIterators.digest(p2.unfilteredIterator(), digest2, version); + assertFalse(Arrays.equals(digest1.digest(), digest2.digest())); + } + finally + { + cfs.truncateBlocking(); + } } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/782a1c3a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java new file mode 100644 index 0000000..5503cfb --- /dev/null +++ b/test/unit/org/apache/cassandra/db/rows/DigestBackwardCompatibilityTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; +import java.util.*; +import java.security.MessageDigest; + +import org.junit.Test; + +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.context.CounterContext; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CounterId; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +/** + * Test that digest for pre-3.0 versions are properly computed (they match the value computed on pre-3.0 nodes). + * + * The concreted 'hard-coded' digests this file tests against have been generated on a 2.2 node using basically + * the same test file but with 2 modifications: + * 1. readAndDigest is modified to work on 2.2 (the actual modification is in the method as a comment) + * 2. the assertions are replace by simple println() of the generated digest. + * + * Note that we only compare against 2.2 since digests should be fixed between version before 3.0 (this would be a bug + * of previous version otherwise). + */ +public class DigestBackwardCompatibilityTest extends CQLTester +{ + private ByteBuffer readAndDigest(String partitionKey) + { + /* + * In 2.2, this must be replaced by: + * ColumnFamily partition = getCurrentColumnFamilyStore().getColumnFamily(QueryFilter.getIdentityFilter(Util.dk(partitionKey), currentTable(), System.currentTimeMillis())); + * return ColumnFamily.digest(partition); + */ + + ImmutableBTreePartition partition = Util.getOnlyPartitionUnfiltered(Util.cmd(getCurrentColumnFamilyStore(), partitionKey).build()); + MessageDigest digest = FBUtilities.threadLocalMD5Digest(); + UnfilteredRowIterators.digest(partition.unfilteredIterator(), digest, MessagingService.VERSION_22); + return ByteBuffer.wrap(digest.digest()); + } + + private void assertDigest(String expected, ByteBuffer actual) + { + String toTest = ByteBufferUtil.bytesToHex(actual); + assertEquals(String.format("[digest from 2.2] %s != %s [digest from 3.0]", expected, toTest), expected, toTest); + } + + @Test + public void testCQLTable() throws Throwable + { + createTable("CREATE TABLE %s (k text, t int, v1 text, v2 int, PRIMARY KEY (k, t))"); + + String key = "someKey"; + int N = 10; + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s(k, t, v1, v2) VALUES (?, ?, ?, ?) USING TIMESTAMP ? AND TTL ?", key, i, "v" + i, i, 1L, 200); + + // ColumnFamily(table_0 [0::false:0@1!200,0:v1:false:2@1!200,0:v2:false:4@1!200,1::false:0@1!200,1:v1:false:2@1!200,1:v2:false:4@1!200,2::false:0@1!200,2:v1:false:2@1!200,2:v2:false:4@1!200,3::false:0@1!200,3:v1:false:2@1!200,3:v2:false:4@1!200,4::false:0@1!200,4:v1:false:2@1!200,4:v2:false:4@1!200,5::false:0@1!200,5:v1:false:2@1!200,5:v2:false:4@1!200,6::false:0@1!200,6:v1:false:2@1!200,6:v2:false:4@1!200,7::false:0@1!200,7:v1:false:2@1!200,7:v2:false:4@1!200,8::false:0@1!200,8:v1:false:2@1!200,8:v2:false:4@1!200,9::false:0@1!200,9:v1:false:2@1!200,9:v2:false:4@1!200,]) + assertDigest("aa608035cf6574a97061b5c166b64939", readAndDigest(key)); + + // This is a cell deletion + execute("DELETE v1 FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 2L, key, 2); + + // This is a range tombstone + execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 3L, key, 4); + + // This is a partition level deletion (but we use an older tombstone so it doesn't get rid of everything and keeps the test interesting) + execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ?", 0L, key); + + // ColumnFamily(table_0 -{deletedAt=0, localDeletion=1441012270, ranges=[4:_-4:!, deletedAt=3, localDeletion=1441012270]}- [0::false:0@1!200,0:v1:false:2@1!200,0:v2:false:4@1!200,1::false:0@1!200,1:v1:false:2@1!200,1:v2:false:4@1!200,2::false:0@1!200,2:v1:true:4@2,2:v2:false:4@1!200,3::false:0@1!200,3:v1:false:2@1!200,3:v2:false:4@1!200,5::false:0@1!200,5:v1:false:2@1!200,5:v2:false:4@1!200,6::false:0@1!200,6:v1:false:2@1!200,6:v2:false:4@1!200,7::false:0@1!200,7:v1:false:2@1!200,7:v2:false:4@1!200,8::false:0@1!200,8:v1:false:2@1!200,8:v2:false:4@1!200,9::false:0@1!200,9:v1:false:2@1!200,9:v2:false:4@1!200,]) + assertDigest("b5f38d2dc7b917d221f98ab1641f82bf", readAndDigest(key)); + } + + @Test + public void testCompactTable() throws Throwable + { + createTable("CREATE TABLE %s (k text, t int, v text, PRIMARY KEY (k, t)) WITH COMPACT STORAGE"); + + String key = "someKey"; + int N = 10; + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s(k, t, v) VALUES (?, ?, ?) USING TIMESTAMP ? AND TTL ?", key, i, "v" + i, 1L, 200); + + assertDigest("44785ddd7c62c73287b448b6063645e5", readAndDigest(key)); + + // This is a cell deletion + execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ? AND t = ?", 2L, key, 2); + + // This is a partition level deletion (but we use an older tombstone so it doesn't get rid of everything and keeps the test interesting) + execute("DELETE FROM %s USING TIMESTAMP ? WHERE k = ?", 0L, key); + + assertDigest("55d9bd6335276395d83b18eb17f9abe7", readAndDigest(key)); + } + + @Test + public void testStaticCompactTable() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, v1 text, v2 int) WITH COMPACT STORAGE"); + + String key = "someKey"; + execute("INSERT INTO %s(k, v1, v2) VALUES (?, ?, ?) USING TIMESTAMP ?", key, "v", 0, 1L); + + assertDigest("d2080f9f57d6edf92da1fdaaa76573d3", readAndDigest(key)); + } + + @Test + public void testTableWithCollection() throws Throwable + { + createTable("CREATE TABLE %s (k text PRIMARY KEY, m map<text, text>)"); + + String key = "someKey"; + + execute("INSERT INTO %s(k, m) VALUES (?, { 'foo' : 'value1', 'bar' : 'value2' }) USING TIMESTAMP ?", key, 1L); + + // ColumnFamily(table_2 -{deletedAt=-9223372036854775808, localDeletion=2147483647, ranges=[m:_-m:!, deletedAt=0, localDeletion=1441012271]}- [:false:0@1,m:626172:false:6@1,m:666f6f:false:6@1,]) + assertDigest("708f3fc8bc8149cc3513eef300bf0182", readAndDigest(key)); + + // This is a collection range tombstone + execute("DELETE m FROM %s USING TIMESTAMP ? WHERE k = ?", 2L, key); + + // ColumnFamily(table_2 -{deletedAt=-9223372036854775808, localDeletion=2147483647, ranges=[m:_-m:!, deletedAt=2, localDeletion=1441012271]}- [:false:0@1,]) + assertDigest("f39937fc3ed96956ef507e81717fa5cd", readAndDigest(key)); + } + + @Test + public void testCounterTable() throws Throwable + { + /* + * We can't use CQL to insert counters as both the timestamp and counter ID are automatically assigned and unpredictable. + * So we need to built it ourselves in a way that is totally equivalent between 2.2 and 3.0 which makes the test a little + * bit less readable. In any case, the code to generate the equivalent mutation on 2.2 is: + * ColumnFamily cf = ArrayBackedSortedColumns.factory.create(getCurrentColumnFamilyStore().metadata); + * ByteBuffer value = CounterContext.instance().createGlobal(CounterId.fromInt(1), 1L, 42L); + * cf.addColumn(new BufferCounterCell(CellNames.simpleSparse(new ColumnIdentifier("c", true)) , value, 0L, Long.MIN_VALUE)); + * new Mutation(KEYSPACE, ByteBufferUtil.bytes(key), cf).applyUnsafe(); + * + * Also note that we use COMPACT STORAGE only because it has no bearing on the test and was slightly easier in 2.2 to create + * the mutation. + */ + + createTable("CREATE TABLE %s (k text PRIMARY KEY, c counter) WITH COMPACT STORAGE"); + + String key = "someKey"; + + CFMetaData metadata = getCurrentColumnFamilyStore().metadata; + ColumnDefinition column = metadata.getColumnDefinition(ByteBufferUtil.bytes("c")); + ByteBuffer value = CounterContext.instance().createGlobal(CounterId.fromInt(1), 1L, 42L); + Row row = BTreeRow.singleCellRow(Clustering.STATIC_CLUSTERING, BufferCell.live(metadata, column, 0L, value)); + + new Mutation(PartitionUpdate.singleRowUpdate(metadata, Util.dk(key), row)).applyUnsafe(); + + assertDigest("3a5f7b48c320538b4cd2f829e05c6db3", readAndDigest(key)); + } +}
