Switch CRC component to Adler and include it for compressed sstables patch by Marcus Eriksson, Radovan Zvoncek, and jbellis for CASSANDRA-4165
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9405ce0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9405ce0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9405ce0e Branch: refs/heads/cassandra-2.1 Commit: 9405ce0e2d95424544561d088d3167cf328964ec Parents: da444a6 Author: Jonathan Ellis <[email protected]> Authored: Wed Mar 12 10:04:29 2014 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Wed Mar 12 10:04:29 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../io/compress/CompressedSequentialWriter.java | 35 ++++----- .../apache/cassandra/io/sstable/Descriptor.java | 3 + .../cassandra/io/sstable/SSTableWriter.java | 19 +++-- .../io/util/DataIntegrityMetadata.java | 83 ++++++++------------ .../cassandra/io/util/SequentialWriter.java | 35 +++++---- 6 files changed, 79 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 61e17e3..7a1fc60 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 2.1.0-beta2 + * Switch CRC component to Adler and include it for compressed sstables + (CASSANDRA-4165) * Allow cassandra-stress to set compaction strategy options (CASSANDRA-6451) * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899) * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 436b6dc..b8a21cc 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -26,20 +26,15 @@ import java.util.zip.Checksum; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.SequentialWriter; public class CompressedSequentialWriter extends SequentialWriter { - public static SequentialWriter open(String dataFilePath, - String indexFilePath, - boolean skipIOCache, - CompressionParameters parameters, - MetadataCollector sstableMetadataCollector) - { - return new CompressedSequentialWriter(new File(dataFilePath), indexFilePath, skipIOCache, parameters, sstableMetadataCollector); - } + private final DataIntegrityMetadata.ChecksumWriter crcMetadata; // holds offset in the file where current chunk should be written // changed only by flush() method where data buffer gets compressed and stored to the file @@ -55,14 +50,12 @@ public class CompressedSequentialWriter extends SequentialWriter // holds a number of already written chunks private int chunkCount = 0; - private final Checksum checksum = new Adler32(); - private long originalSize = 0, compressedSize = 0; private final MetadataCollector sstableMetadataCollector; public CompressedSequentialWriter(File file, - String indexFilePath, + String offsetsPath, boolean skipIOCache, CompressionParameters parameters, MetadataCollector sstableMetadataCollector) @@ -74,10 +67,11 @@ public class CompressedSequentialWriter extends SequentialWriter compressed = new ICompressor.WrappedArray(new byte[compressor.initialCompressedBufferLength(buffer.length)]); /* Index File (-CompressionInfo.db component) and it's header */ - metadataWriter = CompressionMetadata.Writer.open(indexFilePath); + metadataWriter = CompressionMetadata.Writer.open(offsetsPath); metadataWriter.writeHeader(parameters); this.sstableMetadataCollector = sstableMetadataCollector; + crcMetadata = new DataIntegrityMetadata.ChecksumWriter(out); } @Override @@ -125,9 +119,6 @@ public class CompressedSequentialWriter extends SequentialWriter originalSize += validBufferBytes; compressedSize += compressedLength; - // update checksum - checksum.update(compressed.buffer, 0, compressedLength); - try { // write an offset of the newly written chunk to the index file @@ -139,16 +130,13 @@ public class CompressedSequentialWriter extends SequentialWriter // write data itself out.write(compressed.buffer, 0, compressedLength); // write corresponding checksum - out.writeInt((int) checksum.getValue()); + crcMetadata.append(compressed.buffer, 0, compressedLength); } catch (IOException e) { throw new FSWriteError(e, getPath()); } - // reset checksum object to the blank state for re-use - checksum.reset(); - // next chunk should be written right after current + length of the checksum (int) chunkOffset += compressedLength + 4; } @@ -203,6 +191,7 @@ public class CompressedSequentialWriter extends SequentialWriter throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); } + Checksum checksum = new Adler32(); checksum.update(compressed.buffer, 0, chunkSize); if (out.readInt() != (int) checksum.getValue()) @@ -221,8 +210,6 @@ public class CompressedSequentialWriter extends SequentialWriter throw new FSReadError(e, getPath()); } - checksum.reset(); - // reset buffer validBufferBytes = realMark.bufferOffset; bufferOffset = current - validBufferBytes; @@ -270,6 +257,12 @@ public class CompressedSequentialWriter extends SequentialWriter } } + @Override + public void writeFullChecksum(Descriptor descriptor) + { + crcMetadata.writeFullChecksum(descriptor); + } + /** * Class to hold a mark to the position of the file */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/sstable/Descriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/Descriptor.java b/src/java/org/apache/cassandra/io/sstable/Descriptor.java index a2a27d8..4803ae7 100644 --- a/src/java/org/apache/cassandra/io/sstable/Descriptor.java +++ b/src/java/org/apache/cassandra/io/sstable/Descriptor.java @@ -62,6 +62,7 @@ public class Descriptor // checksum the compressed data // ka (2.1.0): new Statistics.db file format // index summaries can be downsampled and the sampling level is persisted + // switch uncompressed checksums to adler32 public static final Version CURRENT = new Version(current_version); @@ -77,6 +78,7 @@ public class Descriptor public final boolean hasPostCompressionAdlerChecksums; public final boolean hasSamplingLevel; public final boolean newStatsFile; + public final boolean hasAllAdlerChecksums; public final boolean hasRepairedAt; public Version(String version) @@ -92,6 +94,7 @@ public class Descriptor hasPostCompressionAdlerChecksums = version.compareTo("jb") >= 0; hasSamplingLevel = version.compareTo("ka") >= 0; newStatsFile = version.compareTo("ka") >= 0; + hasAllAdlerChecksums = version.compareTo("ka") >= 0; hasRepairedAt = version.compareTo("ka") >= 0; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 2d1858f..a7fd881 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -32,7 +32,6 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.AbstractCompactedRow; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.sstable.metadata.MetadataType; @@ -76,7 +75,8 @@ public class SSTableWriter extends SSTable Component.PRIMARY_INDEX, Component.STATS, Component.SUMMARY, - Component.TOC)); + Component.TOC, + Component.DIGEST)); if (metadata.getBloomFilterFpChance() < 1.0) components.add(Component.FILTER); @@ -89,7 +89,6 @@ public class SSTableWriter extends SSTable { // it would feel safer to actually add this component later in maybeWriteDigest(), // but the components are unmodifiable after construction - components.add(Component.DIGEST); components.add(Component.CRC); } return components; @@ -112,17 +111,16 @@ public class SSTableWriter extends SSTable if (compression) { dbuilder = SegmentedFile.getCompressedBuilder(); - dataFile = CompressedSequentialWriter.open(getFilename(), - descriptor.filenameFor(Component.COMPRESSION_INFO), - !metadata.populateIoCacheOnFlush(), - metadata.compressionParameters(), - sstableMetadataCollector); + dataFile = SequentialWriter.open(getFilename(), + descriptor.filenameFor(Component.COMPRESSION_INFO), + !metadata.populateIoCacheOnFlush(), + metadata.compressionParameters(), + sstableMetadataCollector); } else { dbuilder = SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode()); - dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush()); - dataFile.setDataIntegrityWriter(DataIntegrityMetadata.checksumWriter(descriptor)); + dataFile = SequentialWriter.open(new File(getFilename()), !metadata.populateIoCacheOnFlush(), new File(descriptor.filenameFor(Component.CRC))); } this.sstableMetadataCollector = sstableMetadataCollector; @@ -369,6 +367,7 @@ public class SSTableWriter extends SSTable private Pair<Descriptor, StatsMetadata> close(long repairedAt) { + dataFile.writeFullChecksum(descriptor); // index and filter iwriter.close(); // main data, close will truncate if necessary http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index f3c6c63..797b964 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -17,19 +17,21 @@ */ package org.apache.cassandra.io.util; +import java.io.BufferedWriter; import java.io.Closeable; +import java.io.DataOutput; import java.io.File; import java.io.IOError; import java.io.IOException; -import java.nio.channels.ClosedChannelException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.regex.Pattern; +import java.nio.file.Files; +import java.util.zip.Adler32; import java.util.zip.Checksum; +import com.google.common.base.Charsets; + +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.utils.Hex; import org.apache.cassandra.utils.PureJavaCrc32; public class DataIntegrityMetadata @@ -41,24 +43,23 @@ public class DataIntegrityMetadata public static class ChecksumValidator implements Closeable { - private final Checksum checksum = new PureJavaCrc32(); + private final Checksum checksum; private final RandomAccessReader reader; private final Descriptor descriptor; public final int chunkSize; - public ChecksumValidator(Descriptor desc) throws IOException + public ChecksumValidator(Descriptor descriptor) throws IOException { - this.descriptor = desc; - reader = RandomAccessReader.open(new File(desc.filenameFor(Component.CRC))); + this.descriptor = descriptor; + checksum = descriptor.version.hasAllAdlerChecksums ? new Adler32() : new PureJavaCrc32(); + reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))); chunkSize = reader.readInt(); } public void seek(long offset) { long start = chunkStart(offset); - reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per - // chunk + 4 byte - // header/chunkLength + reader.seek(((start / chunkSize) * 4L) + 4); // 8 byte checksum per chunk + 4 byte header/chunkLength } public long chunkStart(long offset) @@ -83,38 +84,22 @@ public class DataIntegrityMetadata } } - public static ChecksumWriter checksumWriter(Descriptor desc) - { - return new ChecksumWriter(desc); - } - - public static class ChecksumWriter implements Closeable + public static class ChecksumWriter { - private final Checksum checksum = new PureJavaCrc32(); - private final MessageDigest digest; - private final SequentialWriter writer; - private final Descriptor descriptor; + private final Checksum incrementalChecksum = new Adler32(); + private final DataOutput incrementalOut; + private final Checksum fullChecksum = new Adler32(); - public ChecksumWriter(Descriptor desc) + public ChecksumWriter(DataOutput incrementalOut) { - this.descriptor = desc; - writer = SequentialWriter.open(new File(desc.filenameFor(Component.CRC)), true); - try - { - digest = MessageDigest.getInstance("SHA-1"); - } - catch (NoSuchAlgorithmException e) - { - // SHA-1 is standard in java 6 - throw new RuntimeException(e); - } + this.incrementalOut = incrementalOut; } public void writeChunkSize(int length) { try { - writer.stream.writeInt(length); + incrementalOut.writeInt(length); } catch (IOException e) { @@ -126,11 +111,11 @@ public class DataIntegrityMetadata { try { - checksum.update(buffer, start, end); - writer.stream.writeInt((int) checksum.getValue()); - checksum.reset(); + incrementalChecksum.update(buffer, start, end); + incrementalOut.writeInt((int) incrementalChecksum.getValue()); + incrementalChecksum.reset(); - digest.update(buffer, start, end); + fullChecksum.update(buffer, start, end); } catch (IOException e) { @@ -138,24 +123,18 @@ public class DataIntegrityMetadata } } - public void close() + public void writeFullChecksum(Descriptor descriptor) { - FileUtils.closeQuietly(writer); - byte[] bytes = digest.digest(); - if (bytes == null) - return; - SequentialWriter out = SequentialWriter.open(new File(descriptor.filenameFor(Component.DIGEST)), true); - // Writting output compatible with sha1sum - Descriptor newdesc = descriptor.asTemporary(false); - String[] tmp = newdesc.filenameFor(Component.DATA).split(Pattern.quote(File.separator)); - String dataFileName = tmp[tmp.length - 1]; + File outFile = new File(descriptor.filenameFor(Component.DIGEST)); + BufferedWriter out = null; try { - out.write(String.format("%s %s", Hex.bytesToHex(bytes), dataFileName).getBytes()); + out = Files.newBufferedWriter(outFile.toPath(), Charsets.UTF_8); + out.write(String.valueOf(fullChecksum.getValue())); } - catch (ClosedChannelException e) + catch (IOException e) { - throw new AssertionError(); // can't happen. + throw new FSWriteError(e, outFile); } finally { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9405ce0e/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index a69391e..c4ec653 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -23,6 +23,10 @@ import java.nio.channels.ClosedChannelException; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.compress.CompressedSequentialWriter; +import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.CLibrary; /** @@ -59,7 +63,6 @@ public class SequentialWriter extends OutputStream private int bytesSinceTrickleFsync = 0; public final DataOutputStream stream; - private DataIntegrityMetadata.ChecksumWriter metadata; public SequentialWriter(File file, int bufferSize, boolean skipIOCache) { @@ -107,6 +110,20 @@ public class SequentialWriter extends OutputStream return new SequentialWriter(file, bufferSize, skipIOCache); } + public static ChecksummedSequentialWriter open(File file, boolean skipIOCache, File crcPath) + { + return new ChecksummedSequentialWriter(file, RandomAccessReader.DEFAULT_BUFFER_SIZE, skipIOCache, crcPath); + } + + public static CompressedSequentialWriter open(String dataFilePath, + String offsetsPath, + boolean skipIOCache, + CompressionParameters parameters, + MetadataCollector sstableMetadataCollector) + { + return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, skipIOCache, parameters, sstableMetadataCollector); + } + public void write(int value) throws ClosedChannelException { if (current >= bufferOffset + buffer.length) @@ -273,9 +290,6 @@ public class SequentialWriter extends OutputStream { throw new FSWriteError(e, getPath()); } - - if (metadata != null) - metadata.append(buffer, 0, validBufferBytes); } public long getFilePointer() @@ -401,21 +415,12 @@ public class SequentialWriter extends OutputStream throw new FSWriteError(e, getPath()); } - FileUtils.closeQuietly(metadata); CLibrary.tryCloseFD(directoryFD); } - /** - * Turn on digest computation on this writer. - * This can only be called before any data is written to this write, - * otherwise an IllegalStateException is thrown. - */ - public void setDataIntegrityWriter(DataIntegrityMetadata.ChecksumWriter writer) + // hack to make life easier for subclasses + public void writeFullChecksum(Descriptor descriptor) { - if (current != 0) - throw new IllegalStateException(); - metadata = writer; - metadata.writeChunkSize(buffer.length); } /**
