Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a472aa9e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a472aa9e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a472aa9e Branch: refs/heads/trunk Commit: a472aa9eaaff3f67035c53dd92b4aee24f2bed36 Parents: 39ee040 fa6205c Author: Aleksey Yeschenko <[email protected]> Authored: Thu Aug 6 21:54:20 2015 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Thu Aug 6 21:54:20 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 96 ++++++++------ .../cassandra/config/DatabaseDescriptor.java | 13 +- .../io/util/ChecksummedRandomAccessReader.java | 110 ++++++++++++++++ .../io/util/DataIntegrityMetadata.java | 17 ++- .../io/ChecksummedRandomAccessReaderTest.java | 127 +++++++++++++++++++ 6 files changed, 319 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 95fade9,ff0fdda..a894297 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,43 -1,6 +1,44 @@@ -2.2.1 +3.0.0-beta1 + * Optimize batchlog replay to avoid full scans (CASSANDRA-7237) + * Repair improvements when using vnodes (CASSANDRA-5220) + * Disable scripted UDFs by default (CASSANDRA-9889) + * Add transparent data encryption core classes (CASSANDRA-9945) + * Bytecode inspection for Java-UDFs (CASSANDRA-9890) + * Use byte to serialize MT hash length (CASSANDRA-9792) +Merged from 2.2: + * Add checksum to saved cache files (CASSANDRA-9265) * Log warning when using an aggregate without partition key (CASSANDRA-9737) +Merged from 2.1: + * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871) +Merged from 2.0: + * Don't cast expected bf size to an int (CASSANDRA-9959) + + +3.0.0-alpha1 + * Implement proper sandboxing for UDFs (CASSANDRA-9402) + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066) + * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850) + * Metrics should use up to date nomenclature (CASSANDRA-9448) + * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384) + * Cleanup crc and adler code for java 8 (CASSANDRA-9650) + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825, + 9848, 9705, 9859, 9867, 9874, 9828, 9801) + * Update Guava to 18.0 (CASSANDRA-9653) + * Bloom filter false positive ratio is not honoured (CASSANDRA-8413) + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522) + * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035) + * Add algorithmic token allocation (CASSANDRA-7032) + * Add nodetool command to replay batchlog (CASSANDRA-9547) + * Make file buffer cache independent of paths being read (CASSANDRA-8897) + * Remove deprecated legacy Hadoop code (CASSANDRA-9353) + * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801) + * Change gossip stabilization to use endpoit size (CASSANDRA-9401) + * Change default garbage collector to G1 (CASSANDRA-7486) + * Populate TokenMetadata early during startup (CASSANDRA-9317) + * Undeprecate cache recentHitRate (CASSANDRA-6591) + * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865) + * Materialized Views (CASSANDRA-6477) +Merged from 2.2: * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900) * UDF / UDA execution time in trace (CASSANDRA-9723) * Fix broken internode SSL (CASSANDRA-9884) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java index 0b334f5,05653ba..3c5b6a5 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@@ -38,7 -39,7 +38,8 @@@ import org.apache.cassandra.db.compacti import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; + import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException; +import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; @@@ -129,14 -136,15 +136,15 @@@ public class AutoSavingCache<K extends long start = System.nanoTime(); // modern format, allows both key and value (so key cache load can be purely sequential) - File path = getCachePath(cfs.metadata.cfId, CURRENT_VERSION); - if (path.exists()) + File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION); + File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION); + if (dataPath.exists() && crcPath.exists()) { - DataInputStream in = null; + DataInputStreamPlus in = null; try { - logger.info(String.format("reading saved cache %s", path)); - in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(path)), path.length())); + logger.info(String.format("reading saved cache %s", dataPath)); - in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); ++ in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); while (in.available() > 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java index 0000000,60b193a..976ff23 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java +++ b/src/java/org/apache/cassandra/io/util/ChecksummedRandomAccessReader.java @@@ -1,0 -1,103 +1,110 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.io.util; + + import java.io.File; + import java.io.IOException; + import java.util.zip.Adler32; + + import org.apache.cassandra.io.compress.BufferType; + import org.apache.cassandra.utils.ByteBufferUtil; + + public class ChecksummedRandomAccessReader extends RandomAccessReader + { + @SuppressWarnings("serial") + public static class CorruptFileException extends RuntimeException + { + public final File file; + - public CorruptFileException(Exception cause, File file) { ++ public CorruptFileException(Exception cause, File file) ++ { + this.file = file; + } + } + + private final DataIntegrityMetadata.ChecksumValidator validator; + private final File file; + - protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException { - super(channel, validator.chunkSize, -1, BufferType.ON_HEAP, null); ++ protected ChecksummedRandomAccessReader(File file, ChannelProxy channel, DataIntegrityMetadata.ChecksumValidator validator) throws IOException ++ { ++ super(channel, validator.chunkSize, -1, BufferType.ON_HEAP); + this.validator = validator; + this.file = file; + } + + public static ChecksummedRandomAccessReader open(File file, File crcFile) throws IOException + { - try (ChannelProxy channel = new ChannelProxy(file)) - { - RandomAccessReader crcReader = RandomAccessReader.open(crcFile); - DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(), - crcReader, - file.getPath()); - return new ChecksummedRandomAccessReader(file, channel, validator); - } ++ ChannelProxy channel = new ChannelProxy(file); ++ RandomAccessReader crcReader = RandomAccessReader.open(crcFile); ++ DataIntegrityMetadata.ChecksumValidator validator = new DataIntegrityMetadata.ChecksumValidator(new Adler32(), ++ crcReader, ++ file.getPath()); ++ return new ChecksummedRandomAccessReader(file, channel, validator); + } + + protected void reBuffer() + { + long desiredPosition = current(); + // align with buffer size, as checksums were computed in chunks of buffer size each. + bufferOffset = (desiredPosition / buffer.capacity()) * buffer.capacity(); + + buffer.clear(); + + long position = bufferOffset; + while (buffer.hasRemaining()) + { + int n = channel.read(buffer, position); + if (n < 0) + break; + position += n; + } + + buffer.flip(); + + try + { + validator.validate(ByteBufferUtil.getArray(buffer), 0, buffer.remaining()); + } + catch (IOException e) + { + throw new CorruptFileException(e, file); + } + + buffer.position((int) (desiredPosition - bufferOffset)); + } + + public void seek(long newPosition) + { + validator.seek(newPosition); + super.seek(newPosition); + } + + public void close() + { - super.close(); - validator.close(); ++ try ++ { ++ super.close(); ++ } ++ finally ++ { ++ channel.close(); ++ validator.close(); ++ } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a472aa9e/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 073fc04,d44bd1c..ac2ab47 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@@ -53,9 -53,15 +53,16 @@@ public class DataIntegrityMetadat public ChecksumValidator(Descriptor descriptor) throws IOException { - this.descriptor = descriptor; - checksum = descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32(); - reader = RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))); - this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : CRC32Factory.instance.create(), ++ this(descriptor.version.hasAllAdlerChecksums() ? new Adler32() : new CRC32(), + RandomAccessReader.open(new File(descriptor.filenameFor(Component.CRC))), + descriptor.filenameFor(Component.DATA)); + } + - public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException { ++ public ChecksumValidator(Checksum checksum, RandomAccessReader reader, String dataFilename) throws IOException ++ { + this.checksum = checksum; + this.reader = reader; + this.dataFilename = dataFilename; chunkSize = reader.readInt(); }
