Updated Branches: refs/heads/cassandra-2.0 e911b767e -> 815b2382a
Switch from crc32 to adler32 for compressed sstable checksumns and change to checksum the post-compressed data Patch by tjake; reviewed by jbellis for (CASSANDRA-5862) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/815b2382 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/815b2382 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/815b2382 Branch: refs/heads/cassandra-2.0 Commit: 815b2382a34dc25f005c9cc1bde78e10bc5d2ae0 Parents: e911b76 Author: Jake Luciani <[email protected]> Authored: Thu Aug 8 17:21:40 2013 -0400 Committer: Jake Luciani <[email protected]> Committed: Thu Aug 8 17:23:57 2013 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compress/CompressedRandomAccessReader.java | 14 ++++++++++++-- .../io/compress/CompressedSequentialWriter.java | 7 ++++--- .../io/compress/CompressionMetadata.java | 6 ++++-- .../apache/cassandra/io/sstable/Descriptor.java | 6 +++++- .../compress/CompressedInputStream.java | 17 ++++++++++++++--- .../compress/CompressedStreamReader.java | 2 +- test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes .../CompressedRandomAccessReaderTest.java | 4 ++-- .../compress/CompressedInputStreamTest.java | 2 +- 10 files changed, 44 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 249c343..3379a80 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 2.0.1 * Notify indexer of columns shadowed by range tombstones (CASSANDRA-5614) * Log Merkle tree stats (CASSANDRA-2698) + * Switch from crc32 to adler32 for compressed sstable checksums (CASSANDRA-5862) 2.0.0 http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java index 3269e4c..b6cffa2 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java @@ -19,6 +19,7 @@ package org.apache.cassandra.io.compress; import java.io.*; import java.nio.ByteBuffer; +import java.util.zip.Adler32; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -65,7 +66,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader private ByteBuffer compressed; // re-use single crc object - private final Checksum checksum = new CRC32(); + private final Checksum checksum; // raw checksum bytes private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]); @@ -74,6 +75,7 @@ public class CompressedRandomAccessReader extends RandomAccessReader { super(new File(dataFilePath), metadata.chunkLength(), owner); this.metadata = metadata; + checksum = metadata.hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32(); compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]); } @@ -122,7 +124,15 @@ public class CompressedRandomAccessReader extends RandomAccessReader if (metadata.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) { - checksum.update(buffer, 0, validBufferBytes); + + if (metadata.hasPostCompressionAdlerChecksums) + { + checksum.update(compressed.array(), 0, chunk.length); + } + else + { + checksum.update(buffer, 0, validBufferBytes); + } if (checksum(chunk) != (int) checksum.getValue()) throw new CorruptBlockException(getPath(), chunk); http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 00eb5a7..386eca5 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.compress; import java.io.EOFException; import java.io.File; import java.io.IOException; +import java.util.zip.Adler32; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -55,7 +56,7 @@ public class CompressedSequentialWriter extends SequentialWriter // holds a number of already written chunks private int chunkCount = 0; - private final Checksum checksum = new CRC32(); + private final Checksum checksum = new Adler32(); private long originalSize = 0, compressedSize = 0; @@ -126,7 +127,7 @@ public class CompressedSequentialWriter extends SequentialWriter compressedSize += compressedLength; // update checksum - checksum.update(buffer, 0, validBufferBytes); + checksum.update(compressed.buffer, 0, compressedLength); try { @@ -204,7 +205,7 @@ public class CompressedSequentialWriter extends SequentialWriter throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); } - checksum.update(buffer, 0, validBytes); + checksum.update(compressed.buffer, 0, chunkSize); if (out.readInt() != (int) checksum.getValue()) throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 93b0091..b6d8e1b 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -42,6 +42,7 @@ public class CompressionMetadata { public final long dataLength; public final long compressedFileLength; + public final boolean hasPostCompressionAdlerChecksums; private final Memory chunkOffsets; public final String indexFilePath; public final CompressionParameters parameters; @@ -60,13 +61,14 @@ public class CompressionMetadata public static CompressionMetadata create(String dataFilePath) { Descriptor desc = Descriptor.fromFilename(dataFilePath); - return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length()); + return new CompressionMetadata(desc.filenameFor(Component.COMPRESSION_INFO), new File(dataFilePath).length(), desc.version.hasPostCompressionAdlerChecksums); } @VisibleForTesting - CompressionMetadata(String indexFilePath, long compressedLength) + CompressionMetadata(String indexFilePath, long compressedLength, boolean hasPostCompressionAdlerChecksums) { this.indexFilePath = indexFilePath; + this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums; DataInputStream stream; try http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index ac1e55f..1b29c1c 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -44,7 +44,7 @@ public class Descriptor public static class Version { // This needs to be at the begining for initialization sake - public static final String current_version = "ja"; + public static final String current_version = "jb"; // ic (1.2.5): omits per-row bloom filter of column names // ja (2.0.0): super columns are serialized as composites (note that there is no real format change, @@ -55,6 +55,8 @@ public class Descriptor // records bloom_filter_fp_chance in metadata component // remove data size and column count from data file (CASSANDRA-4180) // tracks max/min column values (according to comparator) + // jb (2.0.1): switch from crc32 to adler32 for compression checksums + // checksum the compressed data public static final Version CURRENT = new Version(current_version); @@ -67,6 +69,7 @@ public class Descriptor public final boolean offHeapSummaries; public final boolean hasRowSizeAndColumnCount; public final boolean tracksMaxMinColumnNames; + public final boolean hasPostCompressionAdlerChecksums; public Version(String version) { @@ -78,6 +81,7 @@ public class Descriptor offHeapSummaries = version.compareTo("ja") >= 0; hasRowSizeAndColumnCount = version.compareTo("ja") < 0; tracksMaxMinColumnNames = version.compareTo("ja") >= 0; + hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java index 3305f50..698c2fe 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.util.Iterator; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.zip.Adler32; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -52,20 +53,23 @@ public class CompressedInputStream extends InputStream // number of bytes in the buffer that are actually valid protected int validBufferBytes = -1; - private final Checksum checksum = new CRC32(); + private final Checksum checksum; // raw checksum bytes private final byte[] checksumBytes = new byte[4]; private long totalCompressedBytesRead; + private final boolean hasPostCompressionAdlerChecksums; /** * @param source Input source to read compressed data from * @param info Compression info */ - public CompressedInputStream(InputStream source, CompressionInfo info) + public CompressedInputStream(InputStream source, CompressionInfo info, boolean hasPostCompressionAdlerChecksums) { this.info = info; + this.checksum = hasPostCompressionAdlerChecksums ? new Adler32() : new CRC32(); + this.hasPostCompressionAdlerChecksums = hasPostCompressionAdlerChecksums; this.buffer = new byte[info.parameters.chunkLength()]; // buffer is limited to store up to 1024 chunks this.dataBuffer = new ArrayBlockingQueue<byte[]>(Math.min(info.chunks.length, 1024)); @@ -107,7 +111,14 @@ public class CompressedInputStream extends InputStream // validate crc randomly if (info.parameters.getCrcCheckChance() > FBUtilities.threadLocalRandom().nextDouble()) { - checksum.update(buffer, 0, validBufferBytes); + if (hasPostCompressionAdlerChecksums) + { + checksum.update(compressed, 0, compressed.length - checksumBytes.length); + } + else + { + checksum.update(buffer, 0, validBufferBytes); + } System.arraycopy(compressed, compressed.length - checksumBytes.length, checksumBytes, 0, checksumBytes.length); if (Ints.fromByteArray(checksumBytes) != (int) checksum.getValue()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index 44e4d5c..6f5d0f5 100644 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@ -64,7 +64,7 @@ public class CompressedStreamReader extends StreamReader SSTableWriter writer = createWriter(cfs, totalSize); - CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo); + CompressedInputStream cis = new CompressedInputStream(Channels.newInputStream(channel), compressionInfo, inputVersion.hasPostCompressionAdlerChecksums); BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis)); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/data/serialization/2.0/db.RowMutation.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin index aa75378..ff6403c 100644 Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java index 8321ae6..ee32a0e 100644 --- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java +++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java @@ -76,7 +76,7 @@ public class CompressedRandomAccessReaderTest assert f.exists(); RandomAccessReader reader = compressed - ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length())) + ? CompressedRandomAccessReader.open(filename, new CompressionMetadata(filename + ".metadata", f.length(), true)) : RandomAccessReader.open(f); String expected = "The quick brown fox jumps over the lazy dog"; assertEquals(expected.length(), reader.length()); @@ -113,7 +113,7 @@ public class CompressedRandomAccessReaderTest writer.close(); // open compression metadata and get chunk information - CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length()); + CompressionMetadata meta = new CompressionMetadata(metadata.getPath(), file.length(), true); CompressionMetadata.Chunk chunk = meta.chunkFor(0); RandomAccessReader reader = CompressedRandomAccessReader.open(file.getPath(), meta); http://git-wip-us.apache.org/repos/asf/cassandra/blob/815b2382/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java index ab311e6..027c84c 100644 --- a/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java +++ b/test/unit/org/apache/cassandra/streaming/compress/CompressedInputStreamTest.java @@ -97,7 +97,7 @@ public class CompressedInputStreamTest // read buffer using CompressedInputStream CompressionInfo info = new CompressionInfo(chunks, param); - CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info); + CompressedInputStream input = new CompressedInputStream(new ByteArrayInputStream(toRead), info, true); DataInputStream in = new DataInputStream(input); for (int i = 0; i < sections.size(); i++)
