Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 39ee04088 -> a472aa9ea
Add checksum to saved cache files patch by Daniel Chia; reviewed by Ariel Weisberg for CASSANDRA-9265 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fa6205c9 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fa6205c9 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fa6205c9 Branch: refs/heads/cassandra-3.0 Commit: fa6205c909656b09165da4b5ca469328a6450917 Parents: 7636a6b Author: Daniel Chia <[email protected]> Authored: Wed Aug 5 18:46:30 2015 -0400 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Aug 6 21:48:14 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 96 ++++++++------ .../cassandra/config/DatabaseDescriptor.java | 13 +- .../io/util/ChecksummedRandomAccessReader.java | 103 +++++++++++++++ .../io/util/DataIntegrityMetadata.java | 16 ++- .../io/ChecksummedRandomAccessReaderTest.java | 127 +++++++++++++++++++ 6 files changed, 311 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 72ad3cd..ff0fdda 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.1 + * Add checksum to saved cache files (CASSANDRA-9265) * Log warning when using an aggregate without partition key (CASSANDRA-9737) * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900) * UDF / UDA execution time in trace (CASSANDRA-9723) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index a204a18..05653ba 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -39,6 +39,7 @@ import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; +import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; @@ -48,8 +49,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K { public interface IStreamFactory { - public InputStream getInputStream(File path) throws FileNotFoundException; - public OutputStream getOutputStream(File path) throws FileNotFoundException; + public InputStream getInputStream(File dataPath, File crcPath) throws IOException; + public SequentialWriter getOutputWriter(File dataPath, File crcPath) throws FileNotFoundException; } private static final Logger logger = LoggerFactory.getLogger(AutoSavingCache.class); @@ -61,18 +62,18 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K protected final CacheService.CacheType cacheType; private final CacheSerializer<K, V> cacheLoader; - private static final String CURRENT_VERSION = "b"; + private static final String CURRENT_VERSION = "c"; private static volatile IStreamFactory streamFactory = new IStreamFactory() { - public InputStream getInputStream(File path) throws FileNotFoundException + public InputStream getInputStream(File dataPath, File crcPath) throws IOException { - return new FileInputStream(path); + return ChecksummedRandomAccessReader.open(dataPath, crcPath); } - public OutputStream getOutputStream(File path) throws FileNotFoundException + public SequentialWriter getOutputWriter(File dataPath, File crcPath) { - return new FileOutputStream(path); + return SequentialWriter.open(dataPath, crcPath); } }; @@ -89,10 +90,16 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K this.cacheLoader = cacheloader; } - public File getCachePath(UUID cfId, String version) + public File getCacheDataPath(UUID cfId, String version) { Pair<String, String> names = Schema.instance.getCF(cfId); - return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version); + return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "db"); + } + + public File getCacheCrcPath(UUID cfId, String version) + { + Pair<String, String> names = Schema.instance.getCF(cfId); + return DatabaseDescriptor.getSerializedCachePath(names.left, names.right, cfId, cacheType, version, "crc"); } public Writer getWriter(int keysToSave) @@ -129,14 +136,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K long start = System.nanoTime(); // modern format, allows both key and value (so key cache load can be purely sequential) - File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION); - if (path.exists()) + File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION); + File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION); + if (dataPath.exists() && crcPath.exists()) { DataInputStream in = null; try { - logger.info(String.format("reading saved cache %s", path)); - in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length())); + logger.info(String.format("reading saved cache %s", dataPath)); + in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); while (in.available() > 0) { @@ -155,10 +163,15 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K put(entry.left, entry.right); } } + catch (CorruptFileException e) + { + JVMStabilityInspector.inspectThrowable(e); + logger.warn(String.format("Non-fatal checksum error reading saved cache %s", dataPath.getAbsolutePath()), e); + } catch (Exception e) { JVMStabilityInspector.inspectThrowable(e); - logger.debug(String.format("harmless error reading saved cache %s", path.getAbsolutePath()), e); + logger.debug(String.format("harmless error reading saved cache %s", dataPath.getAbsolutePath()), e); } finally { @@ -167,7 +180,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } if (logger.isDebugEnabled()) logger.debug("completed reading ({} ms; {} keys) saved cache {}", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, path); + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), count, dataPath); return count; } @@ -241,9 +254,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K long start = System.nanoTime(); - HashMap<UUID, DataOutputPlus> writers = new HashMap<>(); - HashMap<UUID, OutputStream> streams = new HashMap<>(); - HashMap<UUID, File> paths = new HashMap<>(); + HashMap<UUID, DataOutputPlus> dataOutputs = new HashMap<>(); + HashMap<UUID, SequentialWriter> sequentialWriters = new HashMap<>(); + HashMap<UUID, Pair<File, File>> paths = new HashMap<>(); try { @@ -254,23 +267,23 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K if (!Schema.instance.hasCF(key.getCFId())) continue; // the table has been dropped. - DataOutputPlus writer = writers.get(cfId); + DataOutputPlus writer = dataOutputs.get(cfId); if (writer == null) { - File writerPath = tempCacheFile(cfId); - OutputStream stream; + Pair<File, File> cacheFilePaths = tempCacheFiles(cfId); + SequentialWriter sequentialWriter; try { - stream = streamFactory.getOutputStream(writerPath); - writer = new WrappedDataOutputStreamPlus(stream); + sequentialWriter = streamFactory.getOutputWriter(cacheFilePaths.left, cacheFilePaths.right); + writer = new WrappedDataOutputStreamPlus(sequentialWriter); } catch (FileNotFoundException e) { throw new RuntimeException(e); } - paths.put(cfId, writerPath); - streams.put(cfId, stream); - writers.put(cfId, writer); + paths.put(cfId, cacheFilePaths); + sequentialWriters.put(cfId, sequentialWriter); + dataOutputs.put(cfId, writer); } try @@ -279,7 +292,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } catch (IOException e) { - throw new FSWriteError(e, paths.get(cfId)); + throw new FSWriteError(e, paths.get(cfId).left); } keysWritten++; @@ -299,29 +312,40 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K // not thrown (by OHC) } - for (OutputStream writer : streams.values()) + for (SequentialWriter writer : sequentialWriters.values()) + { + writer.finish(); FileUtils.closeQuietly(writer); + } } - for (Map.Entry<UUID, DataOutputPlus> entry : writers.entrySet()) + for (Map.Entry<UUID, DataOutputPlus> entry : dataOutputs.entrySet()) { UUID cfId = entry.getKey(); - File tmpFile = paths.get(cfId); - File cacheFile = getCachePath(cfId, CURRENT_VERSION); + Pair<File, File> tmpFiles = paths.get(cfId); + File cacheFile = getCacheDataPath(cfId, CURRENT_VERSION); + File crcFile = getCacheCrcPath(cfId, CURRENT_VERSION); cacheFile.delete(); // ignore error if it didn't exist - if (!tmpFile.renameTo(cacheFile)) - logger.error("Unable to rename {} to {}", tmpFile, cacheFile); + crcFile.delete(); + + if (!tmpFiles.left.renameTo(cacheFile)) + logger.error("Unable to rename {} to {}", tmpFiles.left, cacheFile); + + if (!tmpFiles.right.renameTo(crcFile)) + logger.error("Unable to rename {} to {}", tmpFiles.right, crcFile); } logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } - private File tempCacheFile(UUID cfId) + private Pair<File, File> tempCacheFiles(UUID cfId) { - File path = getCachePath(cfId, CURRENT_VERSION); - return FileUtils.createTempFile(path.getName(), null, path.getParentFile()); + File dataPath = getCacheDataPath(cfId, CURRENT_VERSION); + File crcPath = getCacheCrcPath(cfId, CURRENT_VERSION); + return Pair.create(FileUtils.createTempFile(dataPath.getName(), null, dataPath.getParentFile()), + FileUtils.createTempFile(crcPath.getName(), null, crcPath.getParentFile())); } private void deleteOldCacheFiles() http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 5589bc2..e7c76ff 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1122,7 +1122,7 @@ public class DatabaseDescriptor { return conf.commitlog_segment_size_in_mb * 1024 * 1024; } - + public static void setCommitLogSegmentSize(int sizeMegabytes) { conf.commitlog_segment_size_in_mb = sizeMegabytes; @@ -1271,7 +1271,7 @@ public class DatabaseDescriptor { return conf.commitlog_sync_period_in_ms; } - + public static void setCommitLogSyncPeriod(int periodMillis) { conf.commitlog_sync_period_in_ms = periodMillis; @@ -1405,14 +1405,19 @@ public class DatabaseDescriptor return conf.max_hint_window_in_ms; } - public static File getSerializedCachePath(String ksName, String cfName, UUID cfId, CacheService.CacheType cacheType, String version) + public static File getSerializedCachePath(String ksName, + String cfName, + UUID cfId, + CacheService.CacheType cacheType, + String version, + String extension) { StringBuilder builder = new StringBuilder(); builder.append(ksName).append('-'); builder.append(cfName).append('-'); builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-'); builder.append(cacheType); - builder.append((version == null ? "" : "-" + version + ".db")); + builder.append((version == null ? "" : "-" + version + "." + extension)); return new File(conf.saved_caches_directory, builder.toString()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java new file mode 100644 index 0000000..60b193a --- /dev/null +++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io.util; + +import java.io.File; +import java.io.IOException; +import java.util.zip.Adler32; + +import org.apache.cassandra.io.compress.BufferType; +import org.apache.cassandra.utils.ByteBufferUtil; + +public class ChecksummedRandomAccessReader extends RandomAccessReader +{ + @SuppressWarnings("serial") + public static class CorruptFileException extends RuntimeException + { + public final File file; + + public CorruptFileException(Exception cause, File file) { + this.file = file; + } + } + + private final DataIntegrityMetadata.ChecksumValidator validator; + private final File file; + + protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException { + super(channel, validator.chunkSize, -1, BufferType.ON_HEAP, null); + this.validator = validator; + this.file = file; + } + + public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException + { + try (ChannelProxy channel = new ChannelProxy(file)) + { + RandomAccessReader crcReader = RandomAccessReader.open(crcFile); + DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(), + crcReader, + file.getPath()); + return new ChecksummedRandomAccessReader(file, channel, validator); + } + } + + protected void reBuffer() + { + long desiredPosition = current(); + // align with buffer size, as checksums were computed in chunks of buffer size each. + bufferOffset = (desiredPosition / buffer.capacity()) * buffer.capacity(); + + buffer.clear(); + + long position = bufferOffset; + while (buffer.hasRemaining()) + { + int n = channel.read(buffer, position); + if (n < 0) + break; + position += n; + } + + buffer.flip(); + + try + { + validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining()); + } + catch (IOException e) + { + throw new CorruptFileException(e, file); + } + + buffer.position((int) (desiredPosition - bufferOffset)); + } + + public void seek(long newPosition) + { + validator.seek(newPosition); + super.seek(newPosition); + } + + public void close() + { + super.close(); + validator.close(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 4362cee..d44bd1c 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -48,14 +48,20 @@ public class DataIntegrityMetadata { private final Checksum checksum; private final RandomAccessReader reader; - private final Descriptor descriptor; public final int chunkSize; + private final String dataFilename; public ChecksumValidator(Descriptor descriptor) throws IOException { - this.descriptor = descriptor; - checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(); - reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))); + this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(), + RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))), + descriptor.filenameFor(Component.DATA)); + } + + public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException { + this.checksum = checksum; + this.reader = reader; + this.dataFilename = dataFilename; chunkSize = reader.readInt(); } @@ -78,7 +84,7 @@ public class DataIntegrityMetadata checksum.reset(); int actual = reader.readInt(); if (current != actual) - throw new IOException("Corrupted SSTable : " + descriptor.filenameFor(Component.DATA)); + throw new IOException("Corrupted File : " + dataFilename); } public void close() http://git-wip-us.apache.org/repos/asf/cassandra/blob/fa6205c9/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java new file mode 100644 index 0000000..c1e43c9 --- /dev/null +++ b/test/unit/org/apache/cassandra/io/ChecksummedRandomAccessReaderTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.io; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.Test; + +import static org.junit.Assert.*; +import org.apache.cassandra.io.util.ChecksummedRandomAccessReader; +import org.apache.cassandra.io.util.ChecksummedSequentialWriter; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.io.util.SequentialWriter; + +public class ChecksummedRandomAccessReaderTest +{ + @Test + public void readFully() throws IOException + { + final File data = File.createTempFile("testReadFully", "data"); + final File crc = File.createTempFile("testReadFully", "crc"); + + final byte[] expected = new byte[70 * 1024]; // bit more than crc chunk size, so we can test rebuffering. + ThreadLocalRandom.current().nextBytes(expected); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(expected); + writer.finish(); + + assert data.exists(); + + RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); + byte[] b = new byte[expected.length]; + reader.readFully(b); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } + + @Test + public void seek() throws IOException + { + final File data = File.createTempFile("testSeek", "data"); + final File crc = File.createTempFile("testSeek", "crc"); + + final byte[] dataBytes = new byte[70 * 1024]; // bit more than crc chunk size + ThreadLocalRandom.current().nextBytes(dataBytes); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(dataBytes); + writer.finish(); + + assert data.exists(); + + RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); + + final int seekPosition = 66000; + reader.seek(seekPosition); + + byte[] b = new byte[dataBytes.length - seekPosition]; + reader.readFully(b); + + byte[] expected = Arrays.copyOfRange(dataBytes, seekPosition, dataBytes.length); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } + + @Test(expected = ChecksummedRandomAccessReader.CorruptFileException.class) + public void corruptionDetection() throws IOException + { + final File data = File.createTempFile("corruptionDetection", "data"); + final File crc = File.createTempFile("corruptionDetection", "crc"); + + final byte[] expected = new byte[5 * 1024]; + Arrays.fill(expected, (byte) 0); + + SequentialWriter writer = ChecksummedSequentialWriter.open(data, crc); + writer.write(expected); + writer.finish(); + + assert data.exists(); + + // simulate corruption of file + try (RandomAccessFile dataFile = new RandomAccessFile(data, "rw")) + { + dataFile.seek(1024); + dataFile.write((byte) 5); + } + + RandomAccessReader reader = ChecksummedRandomAccessReader.open(data, crc); + byte[] b = new byte[expected.length]; + reader.readFully(b); + + assertArrayEquals(expected, b); + + assertTrue(reader.isEOF()); + + reader.close(); + } +}
